]> git.proxmox.com Git - mirror_ovs.git/blame - python/ovs/db/idl.py
ovs-monitor-ipsec: Add unixctl support.
[mirror_ovs.git] / python / ovs / db / idl.py
CommitLineData
ad0991e6 1# Copyright (c) 2009, 2010, 2011, 2012 Nicira Networks
99155935
BP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at:
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
8cdf0349 15import uuid
99155935
BP
16
17import ovs.jsonrpc
4c0f6271 18import ovs.db.parser
99155935
BP
19import ovs.db.schema
20from ovs.db import error
21import ovs.ovsuuid
8cdf0349 22import ovs.poller
3a656eaf
EJ
23import ovs.vlog
24
25vlog = ovs.vlog.Vlog("idl")
99155935 26
26bb0f31
EJ
27__pychecker__ = 'no-classattr no-objattrs'
28
29
99155935
BP
30class Idl:
31 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
32
33 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
34 requests to an OVSDB database server and parses the responses, converting
35 raw JSON into data structures that are easier for clients to digest.
36
37 The IDL also assists with issuing database transactions. The client
38 creates a transaction, manipulates the IDL data structures, and commits or
39 aborts the transaction. The IDL then composes and issues the necessary
40 JSON-RPC requests and reports to the client whether the transaction
41 completed successfully.
42
8cdf0349
BP
43 The client is allowed to access the following attributes directly, in a
44 read-only fashion:
99155935 45
8cdf0349
BP
46 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
47 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
48 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
49 to a Row object.
50
51 The client may directly read and write the Row objects referenced by the
52 'rows' map values. Refer to Row for more details.
53
54 - 'change_seqno': A number that represents the IDL's state. When the IDL
55 is updated (by Idl.run()), its value changes.
56
57 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
58 if no lock is configured.
59
60 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
61 lock, and False otherwise.
62
63 Locking and unlocking happens asynchronously from the database client's
64 point of view, so the information is only useful for optimization
65 (e.g. if the client doesn't have the lock then there's no point in trying
66 to write to the database).
67
68 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
69 the database server has indicated that some other client already owns the
70 requested lock, and False otherwise.
71
72 - 'txn': The ovs.db.idl.Transaction object for the database transaction
73 currently being constructed, if there is one, or None otherwise.
74"""
75
76 def __init__(self, remote, schema):
99155935
BP
77 """Creates and returns a connection to the database named 'db_name' on
78 'remote', which should be in a form acceptable to
79 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
8cdf0349
BP
80 replica of the remote database.
81
82 'schema' should be the schema for the remote database. The caller may
83 have cut it down by removing tables or columns that are not of
84 interest. The IDL will only replicate the tables and columns that
85 remain. The caller may also add a attribute named 'alert' to selected
86 remaining columns, setting its value to False; if so, then changes to
87 those columns will not be considered changes to the database for the
88 purpose of the return value of Idl.run() and Idl.change_seqno. This is
89 useful for columns that the IDL's client will write but not read.
90
ad0991e6
EJ
91 As a convenience to users, 'schema' may also be an instance of the
92 SchemaHelper class.
93
8cdf0349
BP
94 The IDL uses and modifies 'schema' directly."""
95
ad0991e6
EJ
96 if isinstance(schema, SchemaHelper):
97 schema = schema.get_idl_schema()
98
8cdf0349
BP
99 self.tables = schema.tables
100 self._db = schema
101 self._session = ovs.jsonrpc.Session.open(remote)
102 self._monitor_request_id = None
103 self._last_seqno = None
99155935 104 self.change_seqno = 0
8cdf0349
BP
105
106 # Database locking.
107 self.lock_name = None # Name of lock we need, None if none.
108 self.has_lock = False # Has db server said we have the lock?
26bb0f31 109 self.is_lock_contended = False # Has db server said we can't get lock?
8cdf0349
BP
110 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
111
112 # Transaction support.
113 self.txn = None
114 self._outstanding_txns = {}
115
116 for table in schema.tables.itervalues():
117 for column in table.columns.itervalues():
118 if not hasattr(column, 'alert'):
119 column.alert = True
120 table.need_table = False
121 table.rows = {}
122 table.idl = self
99155935
BP
123
124 def close(self):
8cdf0349
BP
125 """Closes the connection to the database. The IDL will no longer
126 update."""
127 self._session.close()
99155935
BP
128
129 def run(self):
130 """Processes a batch of messages from the database server. Returns
131 True if the database as seen through the IDL changed, False if it did
132 not change. The initial fetch of the entire contents of the remote
8cdf0349
BP
133 database is considered to be one kind of change. If the IDL has been
134 configured to acquire a database lock (with Idl.set_lock()), then
135 successfully acquiring the lock is also considered to be a change.
99155935
BP
136
137 This function can return occasional false positives, that is, report
138 that the database changed even though it didn't. This happens if the
139 connection to the database drops and reconnects, which causes the
140 database contents to be reloaded even if they didn't change. (It could
141 also happen if the database server sends out a "change" that reflects
142 what we already thought was in the database, but the database server is
143 not supposed to do that.)
144
145 As an alternative to checking the return value, the client may check
8cdf0349
BP
146 for changes in self.change_seqno."""
147 assert not self.txn
99155935 148 initial_change_seqno = self.change_seqno
8cdf0349
BP
149 self._session.run()
150 i = 0
151 while i < 50:
152 i += 1
153 if not self._session.is_connected():
154 break
155
156 seqno = self._session.get_seqno()
157 if seqno != self._last_seqno:
158 self._last_seqno = seqno
159 self.__txn_abort_all()
160 self.__send_monitor_request()
161 if self.lock_name:
162 self.__send_lock_request()
163 break
164
165 msg = self._session.recv()
166 if msg is None:
167 break
168 if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
169 and msg.method == "update"
170 and len(msg.params) == 2
171 and msg.params[0] == None):
172 # Database contents changed.
173 self.__parse_update(msg.params[1])
174 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
175 and self._monitor_request_id is not None
176 and self._monitor_request_id == msg.id):
177 # Reply to our "monitor" request.
178 try:
179 self.change_seqno += 1
180 self._monitor_request_id = None
181 self.__clear()
182 self.__parse_update(msg.result)
183 except error.Error, e:
3a656eaf
EJ
184 vlog.err("%s: parse error in received schema: %s"
185 % (self._session.get_name(), e))
8cdf0349
BP
186 self.__error()
187 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
188 and self._lock_request_id is not None
189 and self._lock_request_id == msg.id):
190 # Reply to our "lock" request.
191 self.__parse_lock_reply(msg.result)
192 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
193 and msg.method == "locked"):
194 # We got our lock.
195 self.__parse_lock_notify(msg.params, True)
196 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
197 and msg.method == "stolen"):
198 # Someone else stole our lock.
199 self.__parse_lock_notify(msg.params, False)
200 elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
201 # Reply to our echo request. Ignore it.
202 pass
203 elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
204 ovs.jsonrpc.Message.T_REPLY)
205 and self.__txn_process_reply(msg)):
206 # __txn_process_reply() did everything needed.
207 pass
208 else:
209 # This can happen if a transaction is destroyed before we
210 # receive the reply, so keep the log level low.
3a656eaf
EJ
211 vlog.dbg("%s: received unexpected %s message"
212 % (self._session.get_name(),
213 ovs.jsonrpc.Message.type_to_string(msg.type)))
8cdf0349 214
99155935
BP
215 return initial_change_seqno != self.change_seqno
216
217 def wait(self, poller):
218 """Arranges for poller.block() to wake up when self.run() has something
219 to do or when activity occurs on a transaction on 'self'."""
8cdf0349
BP
220 self._session.wait(poller)
221 self._session.recv_wait(poller)
99155935 222
8cdf0349
BP
223 def has_ever_connected(self):
224 """Returns True, if the IDL successfully connected to the remote
225 database and retrieved its contents (even if the connection
226 subsequently dropped and is in the process of reconnecting). If so,
227 then the IDL contains an atomic snapshot of the database's contents
228 (but it might be arbitrarily old if the connection dropped).
229
230 Returns False if the IDL has never connected or retrieved the
231 database's contents. If so, the IDL is empty."""
232 return self.change_seqno != 0
233
234 def force_reconnect(self):
235 """Forces the IDL to drop its connection to the database and reconnect.
236 In the meantime, the contents of the IDL will not change."""
237 self._session.force_reconnect()
238
239 def set_lock(self, lock_name):
240 """If 'lock_name' is not None, configures the IDL to obtain the named
241 lock from the database server and to avoid modifying the database when
242 the lock cannot be acquired (that is, when another client has the same
243 lock).
244
245 If 'lock_name' is None, drops the locking requirement and releases the
246 lock."""
247 assert not self.txn
248 assert not self._outstanding_txns
249
250 if self.lock_name and (not lock_name or lock_name != self.lock_name):
251 # Release previous lock.
252 self.__send_unlock_request()
253 self.lock_name = None
254 self.is_lock_contended = False
255
256 if lock_name and not self.lock_name:
257 # Acquire new lock.
258 self.lock_name = lock_name
259 self.__send_lock_request()
260
261 def __clear(self):
262 changed = False
263
264 for table in self.tables.itervalues():
265 if table.rows:
266 changed = True
267 table.rows = {}
99155935 268
8cdf0349
BP
269 if changed:
270 self.change_seqno += 1
271
272 def __update_has_lock(self, new_has_lock):
273 if new_has_lock and not self.has_lock:
274 if self._monitor_request_id is None:
275 self.change_seqno += 1
276 else:
277 # We're waiting for a monitor reply, so don't signal that the
278 # database changed. The monitor reply will increment
279 # change_seqno anyhow.
280 pass
281 self.is_lock_contended = False
282 self.has_lock = new_has_lock
283
284 def __do_send_lock_request(self, method):
285 self.__update_has_lock(False)
286 self._lock_request_id = None
287 if self._session.is_connected():
288 msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
289 msg_id = msg.id
290 self._session.send(msg)
291 else:
292 msg_id = None
293 return msg_id
294
295 def __send_lock_request(self):
296 self._lock_request_id = self.__do_send_lock_request("lock")
297
298 def __send_unlock_request(self):
299 self.__do_send_lock_request("unlock")
300
301 def __parse_lock_reply(self, result):
302 self._lock_request_id = None
303 got_lock = type(result) == dict and result.get("locked") is True
304 self.__update_has_lock(got_lock)
305 if not got_lock:
306 self.is_lock_contended = True
307
308 def __parse_lock_notify(self, params, new_has_lock):
309 if (self.lock_name is not None
310 and type(params) in (list, tuple)
311 and params
312 and params[0] == self.lock_name):
313 self.__update_has_lock(self, new_has_lock)
314 if not new_has_lock:
315 self.is_lock_contended = True
99155935
BP
316
317 def __send_monitor_request(self):
318 monitor_requests = {}
8cdf0349 319 for table in self.tables.itervalues():
99155935
BP
320 monitor_requests[table.name] = {"columns": table.columns.keys()}
321 msg = ovs.jsonrpc.Message.create_request(
8cdf0349
BP
322 "monitor", [self._db.name, None, monitor_requests])
323 self._monitor_request_id = msg.id
324 self._session.send(msg)
99155935
BP
325
326 def __parse_update(self, update):
327 try:
328 self.__do_parse_update(update)
329 except error.Error, e:
3a656eaf
EJ
330 vlog.err("%s: error parsing update: %s"
331 % (self._session.get_name(), e))
99155935
BP
332
333 def __do_parse_update(self, table_updates):
334 if type(table_updates) != dict:
335 raise error.Error("<table-updates> is not an object",
336 table_updates)
337
338 for table_name, table_update in table_updates.iteritems():
8cdf0349 339 table = self.tables.get(table_name)
99155935 340 if not table:
f2d8ad13
BP
341 raise error.Error('<table-updates> includes unknown '
342 'table "%s"' % table_name)
99155935
BP
343
344 if type(table_update) != dict:
f2d8ad13
BP
345 raise error.Error('<table-update> for table "%s" is not '
346 'an object' % table_name, table_update)
99155935
BP
347
348 for uuid_string, row_update in table_update.iteritems():
49c541dc 349 if not ovs.ovsuuid.is_valid_string(uuid_string):
f2d8ad13
BP
350 raise error.Error('<table-update> for table "%s" '
351 'contains bad UUID "%s" as member '
352 'name' % (table_name, uuid_string),
99155935 353 table_update)
49c541dc 354 uuid = ovs.ovsuuid.from_string(uuid_string)
99155935
BP
355
356 if type(row_update) != dict:
f2d8ad13
BP
357 raise error.Error('<table-update> for table "%s" '
358 'contains <row-update> for %s that '
359 'is not an object'
99155935
BP
360 % (table_name, uuid_string))
361
af1eba26 362 parser = ovs.db.parser.Parser(row_update, "row-update")
4c0f6271
BP
363 old = parser.get_optional("old", [dict])
364 new = parser.get_optional("new", [dict])
365 parser.finish()
99155935 366
99155935 367 if not old and not new:
f2d8ad13
BP
368 raise error.Error('<row-update> missing "old" and '
369 '"new" members', row_update)
99155935 370
8cdf0349 371 if self.__process_update(table, uuid, old, new):
99155935
BP
372 self.change_seqno += 1
373
8cdf0349 374 def __process_update(self, table, uuid, old, new):
99155935 375 """Returns True if a column changed, False otherwise."""
8cdf0349 376 row = table.rows.get(uuid)
7d48f8f8 377 changed = False
99155935
BP
378 if not new:
379 # Delete row.
380 if row:
8cdf0349 381 del table.rows[uuid]
7d48f8f8 382 changed = True
99155935
BP
383 else:
384 # XXX rate-limit
3a656eaf
EJ
385 vlog.warn("cannot delete missing row %s from table %s"
386 % (uuid, table.name))
99155935
BP
387 elif not old:
388 # Insert row.
389 if not row:
390 row = self.__create_row(table, uuid)
7d48f8f8 391 changed = True
99155935
BP
392 else:
393 # XXX rate-limit
3a656eaf
EJ
394 vlog.warn("cannot add existing row %s to table %s"
395 % (uuid, table.name))
8cdf0349 396 if self.__row_update(table, row, new):
7d48f8f8 397 changed = True
99155935
BP
398 else:
399 if not row:
400 row = self.__create_row(table, uuid)
7d48f8f8 401 changed = True
99155935 402 # XXX rate-limit
3a656eaf
EJ
403 vlog.warn("cannot modify missing row %s in table %s"
404 % (uuid, table.name))
8cdf0349 405 if self.__row_update(table, row, new):
7d48f8f8
BP
406 changed = True
407 return changed
99155935 408
8cdf0349 409 def __row_update(self, table, row, row_json):
99155935
BP
410 changed = False
411 for column_name, datum_json in row_json.iteritems():
412 column = table.columns.get(column_name)
413 if not column:
414 # XXX rate-limit
3a656eaf
EJ
415 vlog.warn("unknown column %s updating table %s"
416 % (column_name, table.name))
99155935
BP
417 continue
418
419 try:
420 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
421 except error.Error, e:
422 # XXX rate-limit
3a656eaf
EJ
423 vlog.warn("error parsing column %s in table %s: %s"
424 % (column_name, table.name, e))
99155935
BP
425 continue
426
8cdf0349
BP
427 if datum != row._data[column_name]:
428 row._data[column_name] = datum
429 if column.alert:
430 changed = True
99155935
BP
431 else:
432 # Didn't really change but the OVSDB monitor protocol always
433 # includes every value in a row.
434 pass
435 return changed
436
99155935 437 def __create_row(self, table, uuid):
8cdf0349 438 data = {}
99155935 439 for column in table.columns.itervalues():
8cdf0349
BP
440 data[column.name] = ovs.db.data.Datum.default(column.type)
441 row = table.rows[uuid] = Row(self, table, uuid, data)
99155935
BP
442 return row
443
8cdf0349
BP
444 def __error(self):
445 self._session.force_reconnect()
446
447 def __txn_abort_all(self):
448 while self._outstanding_txns:
449 txn = self._outstanding_txns.popitem()[1]
4fdfe5cc 450 txn._status = Transaction.AGAIN_WAIT
8cdf0349
BP
451
452 def __txn_process_reply(self, msg):
453 txn = self._outstanding_txns.pop(msg.id, None)
454 if txn:
455 txn._process_reply(msg)
456
26bb0f31 457
8cdf0349
BP
458def _uuid_to_row(atom, base):
459 if base.ref_table:
460 return base.ref_table.rows.get(atom)
461 else:
462 return atom
463
26bb0f31 464
8cdf0349
BP
465def _row_to_uuid(value):
466 if type(value) == Row:
467 return value.uuid
468 else:
469 return value
bf6ec045 470
26bb0f31 471
bf6ec045 472class Row(object):
8cdf0349
BP
473 """A row within an IDL.
474
475 The client may access the following attributes directly:
476
477 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
478
479 - An attribute for each column in the Row's table, named for the column,
480 whose values are as returned by Datum.to_python() for the column's type.
481
482 If some error occurs (e.g. the database server's idea of the column is
483 different from the IDL's idea), then the attribute values is the
484 "default" value return by Datum.default() for the column's type. (It is
485 important to know this because the default value may violate constraints
486 for the column's type, e.g. the default integer value is 0 even if column
487 contraints require the column's value to be positive.)
488
489 When a transaction is active, column attributes may also be assigned new
490 values. Committing the transaction will then cause the new value to be
491 stored into the database.
492
493 *NOTE*: In the current implementation, the value of a column is a *copy*
494 of the value in the database. This means that modifying its value
495 directly will have no useful effect. For example, the following:
496 row.mycolumn["a"] = "b" # don't do this
497 will not change anything in the database, even after commit. To modify
498 the column, instead assign the modified column value back to the column:
499 d = row.mycolumn
500 d["a"] = "b"
501 row.mycolumn = d
502"""
503 def __init__(self, idl, table, uuid, data):
504 # All of the explicit references to self.__dict__ below are required
505 # to set real attributes with invoking self.__getattr__().
506 self.__dict__["uuid"] = uuid
507
508 self.__dict__["_idl"] = idl
509 self.__dict__["_table"] = table
510
511 # _data is the committed data. It takes the following values:
512 #
513 # - A dictionary that maps every column name to a Datum, if the row
514 # exists in the committed form of the database.
515 #
516 # - None, if this row is newly inserted within the active transaction
517 # and thus has no committed form.
518 self.__dict__["_data"] = data
519
520 # _changes describes changes to this row within the active transaction.
521 # It takes the following values:
522 #
523 # - {}, the empty dictionary, if no transaction is active or if the
524 # row has yet not been changed within this transaction.
525 #
526 # - A dictionary that maps a column name to its new Datum, if an
527 # active transaction changes those columns' values.
528 #
529 # - A dictionary that maps every column name to a Datum, if the row
530 # is newly inserted within the active transaction.
531 #
532 # - None, if this transaction deletes this row.
533 self.__dict__["_changes"] = {}
534
535 # A dictionary whose keys are the names of columns that must be
536 # verified as prerequisites when the transaction commits. The values
537 # in the dictionary are all None.
538 self.__dict__["_prereqs"] = {}
539
540 def __getattr__(self, column_name):
541 assert self._changes is not None
542
543 datum = self._changes.get(column_name)
544 if datum is None:
545 datum = self._data[column_name]
546
547 return datum.to_python(_uuid_to_row)
548
549 def __setattr__(self, column_name, value):
550 assert self._changes is not None
551 assert self._idl.txn
552
553 column = self._table.columns[column_name]
554 try:
555 datum = ovs.db.data.Datum.from_python(column.type, value,
556 _row_to_uuid)
557 except error.Error, e:
558 # XXX rate-limit
3a656eaf
EJ
559 vlog.err("attempting to write bad value to column %s (%s)"
560 % (column_name, e))
8cdf0349
BP
561 return
562 self._idl.txn._write(self, column, datum)
563
564 def verify(self, column_name):
565 """Causes the original contents of column 'column_name' in this row to
566 be verified as a prerequisite to completing the transaction. That is,
567 if 'column_name' changed in this row (or if this row was deleted)
568 between the time that the IDL originally read its contents and the time
569 that the transaction commits, then the transaction aborts and
4fdfe5cc
BP
570 Transaction.commit() returns Transaction.AGAIN_WAIT or
571 Transaction.AGAIN_NOW (depending on whether the database change has
572 already been received).
8cdf0349
BP
573
574 The intention is that, to ensure that no transaction commits based on
575 dirty reads, an application should call Row.verify() on each data item
576 read as part of a read-modify-write operation.
577
578 In some cases Row.verify() reduces to a no-op, because the current
579 value of the column is already known:
580
581 - If this row is a row created by the current transaction (returned
582 by Transaction.insert()).
583
584 - If the column has already been modified within the current
585 transaction.
586
587 Because of the latter property, always call Row.verify() *before*
588 modifying the column, for a given read-modify-write.
589
590 A transaction must be in progress."""
591 assert self._idl.txn
592 assert self._changes is not None
593 if not self._data or column_name in self._changes:
594 return
595
596 self._prereqs[column_name] = None
597
598 def delete(self):
599 """Deletes this row from its table.
600
601 A transaction must be in progress."""
602 assert self._idl.txn
603 assert self._changes is not None
604 if self._data is None:
605 del self._idl.txn._txn_rows[self.uuid]
606 self.__dict__["_changes"] = None
607 del self._table.rows[self.uuid]
608
26bb0f31 609
8cdf0349
BP
610def _uuid_name_from_uuid(uuid):
611 return "row%s" % str(uuid).replace("-", "_")
612
26bb0f31 613
8cdf0349
BP
614def _where_uuid_equals(uuid):
615 return [["_uuid", "==", ["uuid", str(uuid)]]]
616
26bb0f31 617
8cdf0349
BP
618class _InsertedRow(object):
619 def __init__(self, op_index):
620 self.op_index = op_index
621 self.real = None
622
26bb0f31 623
8cdf0349
BP
624class Transaction(object):
625 # Status values that Transaction.commit() can return.
26bb0f31
EJ
626 UNCOMMITTED = "uncommitted" # Not yet committed or aborted.
627 UNCHANGED = "unchanged" # Transaction didn't include any changes.
628 INCOMPLETE = "incomplete" # Commit in progress, please wait.
629 ABORTED = "aborted" # ovsdb_idl_txn_abort() called.
630 SUCCESS = "success" # Commit successful.
4fdfe5cc
BP
631 AGAIN_WAIT = "wait then try again"
632 # Commit failed because a "verify" operation
26bb0f31 633 # reported an inconsistency, due to a network
4fdfe5cc
BP
634 # problem, or other transient failure. Wait
635 # for a change, then try again.
636 AGAIN_NOW = "try again now" # Same as AGAIN_WAIT but try again right away.
26bb0f31
EJ
637 NOT_LOCKED = "not locked" # Server hasn't given us the lock yet.
638 ERROR = "error" # Commit failed due to a hard error.
8cdf0349
BP
639
640 @staticmethod
641 def status_to_string(status):
642 """Converts one of the status values that Transaction.commit() can
643 return into a human-readable string.
644
645 (The status values are in fact such strings already, so
646 there's nothing to do.)"""
647 return status
648
649 def __init__(self, idl):
650 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
651 A given Idl may only have a single active transaction at a time.
652
653 A Transaction may modify the contents of a database by assigning new
654 values to columns (attributes of Row), deleting rows (with
655 Row.delete()), or inserting rows (with Transaction.insert()). It may
656 also check that columns in the database have not changed with
657 Row.verify().
658
659 When a transaction is complete (which must be before the next call to
660 Idl.run()), call Transaction.commit() or Transaction.abort()."""
661 assert idl.txn is None
662
663 idl.txn = self
664 self._request_id = None
665 self.idl = idl
666 self.dry_run = False
667 self._txn_rows = {}
668 self._status = Transaction.UNCOMMITTED
669 self._error = None
670 self._comments = []
4fdfe5cc 671 self._commit_seqno = self.idl.change_seqno
8cdf0349
BP
672
673 self._inc_table = None
674 self._inc_column = None
675 self._inc_where = None
676
26bb0f31 677 self._inserted_rows = {} # Map from UUID to _InsertedRow
8cdf0349
BP
678
679 def add_comment(self, comment):
680 """Appens 'comment' to the comments that will be passed to the OVSDB
681 server when this transaction is committed. (The comment will be
682 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
683 relatively human-readable form.)"""
684 self._comments.append(comment)
685
686 def increment(self, table, column, where):
687 assert not self._inc_table
688 self._inc_table = table
689 self._inc_column = column
690 self._inc_where = where
691
692 def wait(self, poller):
693 if self._status not in (Transaction.UNCOMMITTED,
694 Transaction.INCOMPLETE):
695 poller.immediate_wake()
696
697 def _substitute_uuids(self, json):
698 if type(json) in (list, tuple):
699 if (len(json) == 2
700 and json[0] == 'uuid'
701 and ovs.ovsuuid.is_valid_string(json[1])):
702 uuid = ovs.ovsuuid.from_string(json[1])
703 row = self._txn_rows.get(uuid, None)
704 if row and row._data is None:
705 return ["named-uuid", _uuid_name_from_uuid(uuid)]
706 return json
707
708 def __disassemble(self):
709 self.idl.txn = None
710
711 for row in self._txn_rows.itervalues():
712 if row._changes is None:
713 row._table.rows[row.uuid] = row
714 elif row._data is None:
715 del row._table.rows[row.uuid]
716 row.__dict__["_changes"] = {}
717 row.__dict__["_prereqs"] = {}
718 self._txn_rows = {}
719
720 def commit(self):
721 """Attempts to commit this transaction and returns the status of the
722 commit operation, one of the constants declared as class attributes.
723 If the return value is Transaction.INCOMPLETE, then the transaction is
724 not yet complete and the caller should try calling again later, after
725 calling Idl.run() to run the Idl.
726
727 Committing a transaction rolls back all of the changes that it made to
728 the Idl's copy of the database. If the transaction commits
729 successfully, then the database server will send an update and, thus,
730 the Idl will be updated with the committed changes."""
731 # The status can only change if we're the active transaction.
732 # (Otherwise, our status will change only in Idl.run().)
733 if self != self.idl.txn:
734 return self._status
735
736 # If we need a lock but don't have it, give up quickly.
737 if self.idl.lock_name and not self.idl.has_lock():
738 self._status = Transaction.NOT_LOCKED
739 self.__disassemble()
740 return self._status
741
742 operations = [self.idl._db.name]
743
744 # Assert that we have the required lock (avoiding a race).
745 if self.idl.lock_name:
746 operations.append({"op": "assert",
747 "lock": self.idl.lock_name})
748
749 # Add prerequisites and declarations of new rows.
750 for row in self._txn_rows.itervalues():
751 if row._prereqs:
752 rows = {}
753 columns = []
754 for column_name in row._prereqs:
755 columns.append(column_name)
756 rows[column_name] = row._data[column_name].to_json()
757 operations.append({"op": "wait",
758 "table": row._table.name,
759 "timeout": 0,
760 "where": _where_uuid_equals(row.uuid),
761 "until": "==",
762 "columns": columns,
763 "rows": [rows]})
764
765 # Add updates.
766 any_updates = False
767 for row in self._txn_rows.itervalues():
768 if row._changes is None:
769 if row._table.is_root:
770 operations.append({"op": "delete",
771 "table": row._table.name,
772 "where": _where_uuid_equals(row.uuid)})
773 any_updates = True
774 else:
775 # Let ovsdb-server decide whether to really delete it.
776 pass
777 elif row._changes:
778 op = {"table": row._table.name}
779 if row._data is None:
780 op["op"] = "insert"
781 op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
782 any_updates = True
783
784 op_index = len(operations) - 1
785 self._inserted_rows[row.uuid] = _InsertedRow(op_index)
786 else:
787 op["op"] = "update"
788 op["where"] = _where_uuid_equals(row.uuid)
789
790 row_json = {}
791 op["row"] = row_json
792
793 for column_name, datum in row._changes.iteritems():
794 if row._data is not None or not datum.is_default():
26bb0f31
EJ
795 row_json[column_name] = (
796 self._substitute_uuids(datum.to_json()))
8cdf0349
BP
797
798 # If anything really changed, consider it an update.
799 # We can't suppress not-really-changed values earlier
800 # or transactions would become nonatomic (see the big
801 # comment inside Transaction._write()).
802 if (not any_updates and row._data is not None and
803 row._data[column_name] != datum):
804 any_updates = True
805
806 if row._data is None or row_json:
807 operations.append(op)
808
809 # Add increment.
810 if self._inc_table and any_updates:
811 self._inc_index = len(operations) - 1
812
813 operations.append({"op": "mutate",
814 "table": self._inc_table,
26bb0f31
EJ
815 "where": self._substitute_uuids(
816 self._inc_where),
8cdf0349
BP
817 "mutations": [[self._inc_column, "+=", 1]]})
818 operations.append({"op": "select",
819 "table": self._inc_table,
26bb0f31
EJ
820 "where": self._substitute_uuids(
821 self._inc_where),
8cdf0349
BP
822 "columns": [self._inc_column]})
823
824 # Add comment.
825 if self._comments:
826 operations.append({"op": "comment",
827 "comment": "\n".join(self._comments)})
828
829 # Dry run?
830 if self.dry_run:
831 operations.append({"op": "abort"})
832
833 if not any_updates:
834 self._status = Transaction.UNCHANGED
835 else:
836 msg = ovs.jsonrpc.Message.create_request("transact", operations)
837 self._request_id = msg.id
838 if not self.idl._session.send(msg):
839 self.idl._outstanding_txns[self._request_id] = self
840 self._status = Transaction.INCOMPLETE
841 else:
4fdfe5cc 842 self._status = Transaction.AGAIN_WAIT
8cdf0349
BP
843
844 self.__disassemble()
845 return self._status
846
847 def commit_block(self):
848 while True:
849 status = self.commit()
850 if status != Transaction.INCOMPLETE:
851 return status
852
853 self.idl.run()
854
855 poller = ovs.poller.Poller()
856 self.idl.wait(poller)
857 self.wait(poller)
858 poller.block()
859
860 def get_increment_new_value(self):
861 assert self._status == Transaction.SUCCESS
862 return self._inc_new_value
863
864 def abort(self):
865 """Aborts this transaction. If Transaction.commit() has already been
866 called then the transaction might get committed anyhow."""
867 self.__disassemble()
868 if self._status in (Transaction.UNCOMMITTED,
869 Transaction.INCOMPLETE):
870 self._status = Transaction.ABORTED
871
872 def get_error(self):
873 """Returns a string representing this transaction's current status,
874 suitable for use in log messages."""
875 if self._status != Transaction.ERROR:
876 return Transaction.status_to_string(self._status)
877 elif self._error:
878 return self._error
879 else:
880 return "no error details available"
881
882 def __set_error_json(self, json):
883 if self._error is None:
884 self._error = ovs.json.to_string(json)
885
886 def get_insert_uuid(self, uuid):
887 """Finds and returns the permanent UUID that the database assigned to a
888 newly inserted row, given the UUID that Transaction.insert() assigned
889 locally to that row.
890
891 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
892 or if it was assigned by that function and then deleted by Row.delete()
893 within the same transaction. (Rows that are inserted and then deleted
894 within a single transaction are never sent to the database server, so
895 it never assigns them a permanent UUID.)
896
897 This transaction must have completed successfully."""
898 assert self._status in (Transaction.SUCCESS,
899 Transaction.UNCHANGED)
900 inserted_row = self._inserted_rows.get(uuid)
901 if inserted_row:
902 return inserted_row.real
903 return None
904
905 def _write(self, row, column, datum):
906 assert row._changes is not None
907
908 txn = row._idl.txn
909
910 # If this is a write-only column and the datum being written is the
911 # same as the one already there, just skip the update entirely. This
912 # is worth optimizing because we have a lot of columns that get
913 # periodically refreshed into the database but don't actually change
914 # that often.
915 #
916 # We don't do this for read/write columns because that would break
917 # atomicity of transactions--some other client might have written a
918 # different value in that column since we read it. (But if a whole
919 # transaction only does writes of existing values, without making any
920 # real changes, we will drop the whole transaction later in
921 # ovsdb_idl_txn_commit().)
922 if not column.alert and row._data.get(column.name) == datum:
923 new_value = row._changes.get(column.name)
924 if new_value is None or new_value == datum:
925 return
926
927 txn._txn_rows[row.uuid] = row
928 row._changes[column.name] = datum.copy()
929
930 def insert(self, table, new_uuid=None):
931 """Inserts and returns a new row in 'table', which must be one of the
932 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
933
934 The new row is assigned a provisional UUID. If 'uuid' is None then one
935 is randomly generated; otherwise 'uuid' should specify a randomly
936 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
937 different UUID when 'txn' is committed, but the IDL will replace any
938 uses of the provisional UUID in the data to be to be committed by the
939 UUID assigned by ovsdb-server."""
940 assert self._status == Transaction.UNCOMMITTED
941 if new_uuid is None:
942 new_uuid = uuid.uuid4()
943 row = Row(self.idl, table, new_uuid, None)
944 table.rows[row.uuid] = row
945 self._txn_rows[row.uuid] = row
946 return row
947
948 def _process_reply(self, msg):
949 if msg.type == ovs.jsonrpc.Message.T_ERROR:
950 self._status = Transaction.ERROR
951 elif type(msg.result) not in (list, tuple):
952 # XXX rate-limit
3a656eaf 953 vlog.warn('reply to "transact" is not JSON array')
8cdf0349
BP
954 else:
955 hard_errors = False
956 soft_errors = False
957 lock_errors = False
958
959 ops = msg.result
960 for op in ops:
961 if op is None:
962 # This isn't an error in itself but indicates that some
963 # prior operation failed, so make sure that we know about
964 # it.
965 soft_errors = True
966 elif type(op) == dict:
967 error = op.get("error")
968 if error is not None:
969 if error == "timed out":
970 soft_errors = True
971 elif error == "not owner":
972 lock_errors = True
973 elif error == "aborted":
974 pass
975 else:
976 hard_errors = True
977 self.__set_error_json(op)
978 else:
979 hard_errors = True
980 self.__set_error_json(op)
981 # XXX rate-limit
3a656eaf 982 vlog.warn("operation reply is not JSON null or object")
8cdf0349
BP
983
984 if not soft_errors and not hard_errors and not lock_errors:
985 if self._inc_table and not self.__process_inc_reply(ops):
986 hard_errors = True
987
988 for insert in self._inserted_rows.itervalues():
989 if not self.__process_insert_reply(insert, ops):
990 hard_errors = True
991
992 if hard_errors:
993 self._status = Transaction.ERROR
994 elif lock_errors:
995 self._status = Transaction.NOT_LOCKED
996 elif soft_errors:
4fdfe5cc
BP
997 if self._commit_seqno == self.idl.change_seqno:
998 self._status = Transaction.AGAIN_WAIT
999 else:
1000 self._status = Transaction.AGAIN_NOW
8cdf0349
BP
1001 else:
1002 self._status = Transaction.SUCCESS
1003
1004 @staticmethod
1005 def __check_json_type(json, types, name):
1006 if not json:
1007 # XXX rate-limit
3a656eaf 1008 vlog.warn("%s is missing" % name)
8cdf0349
BP
1009 return False
1010 elif type(json) not in types:
1011 # XXX rate-limit
3a656eaf 1012 vlog.warn("%s has unexpected type %s" % (name, type(json)))
8cdf0349
BP
1013 return False
1014 else:
1015 return True
1016
1017 def __process_inc_reply(self, ops):
1018 if self._inc_index + 2 > len(ops):
1019 # XXX rate-limit
3a656eaf
EJ
1020 vlog.warn("reply does not contain enough operations for "
1021 "increment (has %d, needs %d)" %
1022 (len(ops), self._inc_index + 2))
8cdf0349
BP
1023
1024 # We know that this is a JSON object because the loop in
1025 # __process_reply() already checked.
1026 mutate = ops[self._inc_index]
1027 count = mutate.get("count")
1028 if not Transaction.__check_json_type(count, (int, long),
1029 '"mutate" reply "count"'):
1030 return False
1031 if count != 1:
1032 # XXX rate-limit
3a656eaf 1033 vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
8cdf0349
BP
1034 return False
1035
1036 select = ops[self._inc_index + 1]
1037 rows = select.get("rows")
1038 if not Transaction.__check_json_type(rows, (list, tuple),
1039 '"select" reply "rows"'):
1040 return False
1041 if len(rows) != 1:
1042 # XXX rate-limit
3a656eaf
EJ
1043 vlog.warn('"select" reply "rows" has %d elements '
1044 'instead of 1' % len(rows))
8cdf0349
BP
1045 return False
1046 row = rows[0]
1047 if not Transaction.__check_json_type(row, (dict,),
1048 '"select" reply row'):
1049 return False
1050 column = row.get(self._inc_column)
1051 if not Transaction.__check_json_type(column, (int, long),
1052 '"select" reply inc column'):
1053 return False
1054 self._inc_new_value = column
1055 return True
1056
1057 def __process_insert_reply(self, insert, ops):
1058 if insert.op_index >= len(ops):
1059 # XXX rate-limit
3a656eaf
EJ
1060 vlog.warn("reply does not contain enough operations "
1061 "for insert (has %d, needs %d)"
1062 % (len(ops), insert.op_index))
8cdf0349
BP
1063 return False
1064
1065 # We know that this is a JSON object because the loop in
1066 # __process_reply() already checked.
1067 reply = ops[insert.op_index]
1068 json_uuid = reply.get("uuid")
1069 if not Transaction.__check_json_type(json_uuid, (tuple, list),
1070 '"insert" reply "uuid"'):
1071 return False
1072
1073 try:
1074 uuid_ = ovs.ovsuuid.from_json(json_uuid)
1075 except error.Error:
1076 # XXX rate-limit
3a656eaf 1077 vlog.warn('"insert" reply "uuid" is not a JSON UUID')
8cdf0349 1078 return False
bf6ec045 1079
8cdf0349
BP
1080 insert.real = uuid_
1081 return True
ad0991e6
EJ
1082
1083
1084class SchemaHelper(object):
1085 """IDL Schema helper.
1086
1087 This class encapsulates the logic required to generate schemas suitable
1088 for creating 'ovs.db.idl.Idl' objects. Clients should register columns
1089 they are interested in using register_columns(). When finished, the
1090 get_idl_schema() function may be called.
1091
1092 The location on disk of the schema used may be found in the
1093 'schema_location' variable."""
1094
1095 def __init__(self, location=None):
1096 """Creates a new Schema object."""
1097
1098 if location is None:
1099 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1100
1101 self.schema_location = location
1102 self._tables = {}
1103
1104 def register_columns(self, table, columns):
1105 """Registers interest in the given 'columns' of 'table'. Future calls
1106 to get_idl_schema() will include 'table':column for each column in
1107 'columns'. This function automatically avoids adding duplicate entries
1108 to the schema.
1109
1110 'table' must be a string.
1111 'columns' must be a list of strings.
1112 """
1113
1114 assert type(table) is str
1115 assert type(columns) is list
1116
1117 columns = set(columns) | self._tables.get(table, set())
1118 self._tables[table] = columns
1119
1120 def get_idl_schema(self):
1121 """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1122 object based on columns registered using the register_columns()
1123 function."""
1124
1125 schema = ovs.db.schema.DbSchema.from_json(
1126 ovs.json.from_file(self.schema_location))
1127 schema_tables = {}
1128 for table, columns in self._tables.iteritems():
1129 schema_tables[table] = (
1130 self._keep_table_columns(schema, table, columns))
1131
1132 schema.tables = schema_tables
1133 return schema
1134
1135 def _keep_table_columns(self, schema, table_name, columns):
1136 assert table_name in schema.tables
1137 table = schema.tables[table_name]
1138
1139 new_columns = {}
1140 for column_name in columns:
1141 assert type(column_name) is str
1142 assert column_name in table.columns
1143
1144 new_columns[column_name] = table.columns[column_name]
1145
1146 table.columns = new_columns
1147 return table