1 # Copyright (c) 2009, 2010, 2011, 2012, 2013, 2016 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 import ovs
.db
.data
as data
25 from ovs
.db
import custom_index
26 from ovs
.db
import error
30 vlog
= ovs
.vlog
.Vlog("idl")
32 __pychecker__
= 'no-classattr no-objattrs'
43 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
45 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
46 requests to an OVSDB database server and parses the responses, converting
47 raw JSON into data structures that are easier for clients to digest.
49 The IDL also assists with issuing database transactions. The client
50 creates a transaction, manipulates the IDL data structures, and commits or
51 aborts the transaction. The IDL then composes and issues the necessary
52 JSON-RPC requests and reports to the client whether the transaction
53 completed successfully.
55 The client is allowed to access the following attributes directly, in a
58 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
59 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
60 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
63 The client may directly read and write the Row objects referenced by the
64 'rows' map values. Refer to Row for more details.
66 - 'change_seqno': A number that represents the IDL's state. When the IDL
67 is updated (by Idl.run()), its value changes. The sequence number can
68 occasionally change even if the database does not. This happens if the
69 connection to the database drops and reconnects, which causes the
70 database contents to be reloaded even if they didn't change. (It could
71 also happen if the database server sends out a "change" that reflects
72 what the IDL already thought was in the database. The database server is
73 not supposed to do that, but bugs could in theory cause it to do so.)
75 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
76 if no lock is configured.
78 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
79 lock, and False otherwise.
81 Locking and unlocking happens asynchronously from the database client's
82 point of view, so the information is only useful for optimization
83 (e.g. if the client doesn't have the lock then there's no point in trying
84 to write to the database).
86 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
87 the database server has indicated that some other client already owns the
88 requested lock, and False otherwise.
90 - 'txn': The ovs.db.idl.Transaction object for the database transaction
91 currently being constructed, if there is one, or None otherwise.
95 IDL_S_MONITOR_REQUESTED
= 1
96 IDL_S_MONITOR_COND_REQUESTED
= 2
98 def __init__(self
, remote
, schema
, probe_interval
=None):
99 """Creates and returns a connection to the database named 'db_name' on
100 'remote', which should be in a form acceptable to
101 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
102 replica of the remote database.
104 'schema' should be the schema for the remote database. The caller may
105 have cut it down by removing tables or columns that are not of
106 interest. The IDL will only replicate the tables and columns that
107 remain. The caller may also add a attribute named 'alert' to selected
108 remaining columns, setting its value to False; if so, then changes to
109 those columns will not be considered changes to the database for the
110 purpose of the return value of Idl.run() and Idl.change_seqno. This is
111 useful for columns that the IDL's client will write but not read.
113 As a convenience to users, 'schema' may also be an instance of the
116 The IDL uses and modifies 'schema' directly.
118 If "probe_interval" is zero it disables the connection keepalive
119 feature. If non-zero the value will be forced to at least 1000
120 milliseconds. If None it will just use the default value in OVS.
123 assert isinstance(schema
, SchemaHelper
)
124 schema
= schema
.get_idl_schema()
126 self
.tables
= schema
.tables
127 self
.readonly
= schema
.readonly
129 self
._session
= ovs
.jsonrpc
.Session
.open(remote
,
130 probe_interval
=probe_interval
)
131 self
._monitor
_request
_id
= None
132 self
._last
_seqno
= None
133 self
.change_seqno
= 0
134 self
.uuid
= uuid
.uuid1()
135 self
.state
= self
.IDL_S_INITIAL
138 self
.lock_name
= None # Name of lock we need, None if none.
139 self
.has_lock
= False # Has db server said we have the lock?
140 self
.is_lock_contended
= False # Has db server said we can't get lock?
141 self
._lock
_request
_id
= None # JSON-RPC ID of in-flight lock request.
143 # Transaction support.
145 self
._outstanding
_txns
= {}
147 for table
in six
.itervalues(schema
.tables
):
148 for column
in six
.itervalues(table
.columns
):
149 if not hasattr(column
, 'alert'):
151 table
.need_table
= False
152 table
.rows
= custom_index
.IndexedRows(table
)
154 table
.condition
= [True]
155 table
.cond_changed
= False
157 def index_create(self
, table
, name
):
158 """Create a named multi-column index on a table"""
159 return self
.tables
[table
].rows
.index_create(name
)
161 def index_irange(self
, table
, name
, start
, end
):
162 """Return items in a named index between start/end inclusive"""
163 return self
.tables
[table
].rows
.indexes
[name
].irange(start
, end
)
165 def index_equal(self
, table
, name
, value
):
166 """Return items in a named index matching a value"""
167 return self
.tables
[table
].rows
.indexes
[name
].irange(value
, value
)
170 """Closes the connection to the database. The IDL will no longer
172 self
._session
.close()
175 """Processes a batch of messages from the database server. Returns
176 True if the database as seen through the IDL changed, False if it did
177 not change. The initial fetch of the entire contents of the remote
178 database is considered to be one kind of change. If the IDL has been
179 configured to acquire a database lock (with Idl.set_lock()), then
180 successfully acquiring the lock is also considered to be a change.
182 This function can return occasional false positives, that is, report
183 that the database changed even though it didn't. This happens if the
184 connection to the database drops and reconnects, which causes the
185 database contents to be reloaded even if they didn't change. (It could
186 also happen if the database server sends out a "change" that reflects
187 what we already thought was in the database, but the database server is
188 not supposed to do that.)
190 As an alternative to checking the return value, the client may check
191 for changes in self.change_seqno."""
193 initial_change_seqno
= self
.change_seqno
195 self
.send_cond_change()
200 if not self
._session
.is_connected():
203 seqno
= self
._session
.get_seqno()
204 if seqno
!= self
._last
_seqno
:
205 self
._last
_seqno
= seqno
206 self
.__txn
_abort
_all
()
207 self
.__send
_monitor
_request
()
209 self
.__send
_lock
_request
()
212 msg
= self
._session
.recv()
215 if (msg
.type == ovs
.jsonrpc
.Message
.T_NOTIFY
216 and msg
.method
== "update2"
217 and len(msg
.params
) == 2):
218 # Database contents changed.
219 self
.__parse
_update
(msg
.params
[1], OVSDB_UPDATE2
)
220 elif (msg
.type == ovs
.jsonrpc
.Message
.T_NOTIFY
221 and msg
.method
== "update"
222 and len(msg
.params
) == 2):
223 # Database contents changed.
224 self
.__parse
_update
(msg
.params
[1], OVSDB_UPDATE
)
225 elif (msg
.type == ovs
.jsonrpc
.Message
.T_REPLY
226 and self
._monitor
_request
_id
is not None
227 and self
._monitor
_request
_id
== msg
.id):
228 # Reply to our "monitor" request.
230 self
.change_seqno
+= 1
231 self
._monitor
_request
_id
= None
233 if self
.state
== self
.IDL_S_MONITOR_COND_REQUESTED
:
234 self
.__parse
_update
(msg
.result
, OVSDB_UPDATE2
)
236 assert self
.state
== self
.IDL_S_MONITOR_REQUESTED
237 self
.__parse
_update
(msg
.result
, OVSDB_UPDATE
)
239 except error
.Error
as e
:
240 vlog
.err("%s: parse error in received schema: %s"
241 % (self
._session
.get_name(), e
))
243 elif (msg
.type == ovs
.jsonrpc
.Message
.T_REPLY
244 and self
._lock
_request
_id
is not None
245 and self
._lock
_request
_id
== msg
.id):
246 # Reply to our "lock" request.
247 self
.__parse
_lock
_reply
(msg
.result
)
248 elif (msg
.type == ovs
.jsonrpc
.Message
.T_NOTIFY
249 and msg
.method
== "locked"):
251 self
.__parse
_lock
_notify
(msg
.params
, True)
252 elif (msg
.type == ovs
.jsonrpc
.Message
.T_NOTIFY
253 and msg
.method
== "stolen"):
254 # Someone else stole our lock.
255 self
.__parse
_lock
_notify
(msg
.params
, False)
256 elif msg
.type == ovs
.jsonrpc
.Message
.T_NOTIFY
and msg
.id == "echo":
257 # Reply to our echo request. Ignore it.
259 elif (msg
.type == ovs
.jsonrpc
.Message
.T_ERROR
and
260 self
.state
== self
.IDL_S_MONITOR_COND_REQUESTED
and
261 self
._monitor
_request
_id
== msg
.id):
262 if msg
.error
== "unknown method":
263 self
.__send
_monitor
_request
()
264 elif (msg
.type in (ovs
.jsonrpc
.Message
.T_ERROR
,
265 ovs
.jsonrpc
.Message
.T_REPLY
)
266 and self
.__txn
_process
_reply
(msg
)):
267 # __txn_process_reply() did everything needed.
270 # This can happen if a transaction is destroyed before we
271 # receive the reply, so keep the log level low.
272 vlog
.dbg("%s: received unexpected %s message"
273 % (self
._session
.get_name(),
274 ovs
.jsonrpc
.Message
.type_to_string(msg
.type)))
276 return initial_change_seqno
!= self
.change_seqno
278 def send_cond_change(self
):
279 if not self
._session
.is_connected():
282 for table
in six
.itervalues(self
.tables
):
283 if table
.cond_changed
:
284 self
.__send
_cond
_change
(table
, table
.condition
)
285 table
.cond_changed
= False
287 def cond_change(self
, table_name
, cond
):
288 """Sets the condition for 'table_name' to 'cond', which should be a
289 conditional expression suitable for use directly in the OVSDB
290 protocol, with the exception that the empty condition []
291 matches no rows (instead of matching every row). That is, []
292 is equivalent to [False], not to [True].
295 table
= self
.tables
.get(table_name
)
297 raise error
.Error('Unknown table "%s"' % table_name
)
301 if table
.condition
!= cond
:
302 table
.condition
= cond
303 table
.cond_changed
= True
305 def wait(self
, poller
):
306 """Arranges for poller.block() to wake up when self.run() has something
307 to do or when activity occurs on a transaction on 'self'."""
308 self
._session
.wait(poller
)
309 self
._session
.recv_wait(poller
)
311 def has_ever_connected(self
):
312 """Returns True, if the IDL successfully connected to the remote
313 database and retrieved its contents (even if the connection
314 subsequently dropped and is in the process of reconnecting). If so,
315 then the IDL contains an atomic snapshot of the database's contents
316 (but it might be arbitrarily old if the connection dropped).
318 Returns False if the IDL has never connected or retrieved the
319 database's contents. If so, the IDL is empty."""
320 return self
.change_seqno
!= 0
322 def force_reconnect(self
):
323 """Forces the IDL to drop its connection to the database and reconnect.
324 In the meantime, the contents of the IDL will not change."""
325 self
._session
.force_reconnect()
327 def set_lock(self
, lock_name
):
328 """If 'lock_name' is not None, configures the IDL to obtain the named
329 lock from the database server and to avoid modifying the database when
330 the lock cannot be acquired (that is, when another client has the same
333 If 'lock_name' is None, drops the locking requirement and releases the
336 assert not self
._outstanding
_txns
338 if self
.lock_name
and (not lock_name
or lock_name
!= self
.lock_name
):
339 # Release previous lock.
340 self
.__send
_unlock
_request
()
341 self
.lock_name
= None
342 self
.is_lock_contended
= False
344 if lock_name
and not self
.lock_name
:
346 self
.lock_name
= lock_name
347 self
.__send
_lock
_request
()
349 def notify(self
, event
, row
, updates
=None):
350 """Hook for implementing create/update/delete notifications
352 :param event: The event that was triggered
353 :type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE
354 :param row: The row as it is after the operation has occured
356 :param updates: For updates, row with only old values of the changed
361 def __send_cond_change(self
, table
, cond
):
362 monitor_cond_change
= {table
.name
: [{"where": cond
}]}
363 old_uuid
= str(self
.uuid
)
364 self
.uuid
= uuid
.uuid1()
365 params
= [old_uuid
, str(self
.uuid
), monitor_cond_change
]
366 msg
= ovs
.jsonrpc
.Message
.create_request("monitor_cond_change", params
)
367 self
._session
.send(msg
)
372 for table
in six
.itervalues(self
.tables
):
375 table
.rows
= custom_index
.IndexedRows(table
)
378 self
.change_seqno
+= 1
380 def __update_has_lock(self
, new_has_lock
):
381 if new_has_lock
and not self
.has_lock
:
382 if self
._monitor
_request
_id
is None:
383 self
.change_seqno
+= 1
385 # We're waiting for a monitor reply, so don't signal that the
386 # database changed. The monitor reply will increment
387 # change_seqno anyhow.
389 self
.is_lock_contended
= False
390 self
.has_lock
= new_has_lock
392 def __do_send_lock_request(self
, method
):
393 self
.__update
_has
_lock
(False)
394 self
._lock
_request
_id
= None
395 if self
._session
.is_connected():
396 msg
= ovs
.jsonrpc
.Message
.create_request(method
, [self
.lock_name
])
398 self
._session
.send(msg
)
403 def __send_lock_request(self
):
404 self
._lock
_request
_id
= self
.__do
_send
_lock
_request
("lock")
406 def __send_unlock_request(self
):
407 self
.__do
_send
_lock
_request
("unlock")
409 def __parse_lock_reply(self
, result
):
410 self
._lock
_request
_id
= None
411 got_lock
= isinstance(result
, dict) and result
.get("locked") is True
412 self
.__update
_has
_lock
(got_lock
)
414 self
.is_lock_contended
= True
416 def __parse_lock_notify(self
, params
, new_has_lock
):
417 if (self
.lock_name
is not None
418 and isinstance(params
, (list, tuple))
420 and params
[0] == self
.lock_name
):
421 self
.__update
_has
_lock
(new_has_lock
)
423 self
.is_lock_contended
= True
425 def __send_monitor_request(self
):
426 if self
.state
== self
.IDL_S_INITIAL
:
427 self
.state
= self
.IDL_S_MONITOR_COND_REQUESTED
428 method
= "monitor_cond"
430 self
.state
= self
.IDL_S_MONITOR_REQUESTED
433 monitor_requests
= {}
434 for table
in six
.itervalues(self
.tables
):
436 for column
in six
.iterkeys(table
.columns
):
437 if ((table
.name
not in self
.readonly
) or
438 (table
.name
in self
.readonly
) and
439 (column
not in self
.readonly
[table
.name
])):
440 columns
.append(column
)
441 monitor_request
= {"columns": columns
}
442 if method
== "monitor_cond" and table
.condition
!= [True]:
443 monitor_request
["where"] = table
.condition
444 table
.cond_change
= False
445 monitor_requests
[table
.name
] = [monitor_request
]
447 msg
= ovs
.jsonrpc
.Message
.create_request(
448 method
, [self
._db
.name
, str(self
.uuid
), monitor_requests
])
449 self
._monitor
_request
_id
= msg
.id
450 self
._session
.send(msg
)
452 def __parse_update(self
, update
, version
):
454 self
.__do
_parse
_update
(update
, version
)
455 except error
.Error
as e
:
456 vlog
.err("%s: error parsing update: %s"
457 % (self
._session
.get_name(), e
))
459 def __do_parse_update(self
, table_updates
, version
):
460 if not isinstance(table_updates
, dict):
461 raise error
.Error("<table-updates> is not an object",
464 for table_name
, table_update
in six
.iteritems(table_updates
):
465 table
= self
.tables
.get(table_name
)
467 raise error
.Error('<table-updates> includes unknown '
468 'table "%s"' % table_name
)
470 if not isinstance(table_update
, dict):
471 raise error
.Error('<table-update> for table "%s" is not '
472 'an object' % table_name
, table_update
)
474 for uuid_string
, row_update
in six
.iteritems(table_update
):
475 if not ovs
.ovsuuid
.is_valid_string(uuid_string
):
476 raise error
.Error('<table-update> for table "%s" '
477 'contains bad UUID "%s" as member '
478 'name' % (table_name
, uuid_string
),
480 uuid
= ovs
.ovsuuid
.from_string(uuid_string
)
482 if not isinstance(row_update
, dict):
483 raise error
.Error('<table-update> for table "%s" '
484 'contains <row-update> for %s that '
486 % (table_name
, uuid_string
))
488 if version
== OVSDB_UPDATE2
:
489 if self
.__process
_update
2(table
, uuid
, row_update
):
490 self
.change_seqno
+= 1
493 parser
= ovs
.db
.parser
.Parser(row_update
, "row-update")
494 old
= parser
.get_optional("old", [dict])
495 new
= parser
.get_optional("new", [dict])
498 if not old
and not new
:
499 raise error
.Error('<row-update> missing "old" and '
500 '"new" members', row_update
)
502 if self
.__process
_update
(table
, uuid
, old
, new
):
503 self
.change_seqno
+= 1
505 def __process_update2(self
, table
, uuid
, row_update
):
506 row
= table
.rows
.get(uuid
)
508 if "delete" in row_update
:
511 self
.notify(ROW_DELETE
, row
)
515 vlog
.warn("cannot delete missing row %s from table"
516 "%s" % (uuid
, table
.name
))
517 elif "insert" in row_update
or "initial" in row_update
:
519 vlog
.warn("cannot add existing row %s from table"
520 " %s" % (uuid
, table
.name
))
522 row
= self
.__create
_row
(table
, uuid
)
523 if "insert" in row_update
:
524 row_update
= row_update
['insert']
526 row_update
= row_update
['initial']
527 self
.__add
_default
(table
, row_update
)
528 changed
= self
.__row
_update
(table
, row
, row_update
)
529 table
.rows
[uuid
] = row
531 self
.notify(ROW_CREATE
, row
)
532 elif "modify" in row_update
:
534 raise error
.Error('Modify non-existing row')
536 old_row
= self
.__apply
_diff
(table
, row
, row_update
['modify'])
537 self
.notify(ROW_UPDATE
, row
, Row(self
, table
, uuid
, old_row
))
540 raise error
.Error('<row-update> unknown operation',
544 def __process_update(self
, table
, uuid
, old
, new
):
545 """Returns True if a column changed, False otherwise."""
546 row
= table
.rows
.get(uuid
)
553 self
.notify(ROW_DELETE
, row
)
556 vlog
.warn("cannot delete missing row %s from table %s"
557 % (uuid
, table
.name
))
562 row
= self
.__create
_row
(table
, uuid
)
567 vlog
.warn("cannot add existing row %s to table %s"
568 % (uuid
, table
.name
))
569 changed |
= self
.__row
_update
(table
, row
, new
)
571 table
.rows
[uuid
] = row
573 self
.notify(ROW_CREATE
, row
)
577 row
= self
.__create
_row
(table
, uuid
)
581 vlog
.warn("cannot modify missing row %s in table %s"
582 % (uuid
, table
.name
))
583 changed |
= self
.__row
_update
(table
, row
, new
)
585 table
.rows
[uuid
] = row
587 self
.notify(op
, row
, Row
.from_json(self
, table
, uuid
, old
))
590 def __column_name(self
, column
):
591 if column
.type.key
.type == ovs
.db
.types
.UuidType
:
592 return ovs
.ovsuuid
.to_json(column
.type.key
.type.default
)
594 return column
.type.key
.type.default
596 def __add_default(self
, table
, row_update
):
597 for column
in six
.itervalues(table
.columns
):
598 if column
.name
not in row_update
:
599 if ((table
.name
not in self
.readonly
) or
600 (table
.name
in self
.readonly
) and
601 (column
.name
not in self
.readonly
[table
.name
])):
602 if column
.type.n_min
!= 0 and not column
.type.is_map():
603 row_update
[column
.name
] = self
.__column
_name
(column
)
605 def __apply_diff(self
, table
, row
, row_diff
):
607 for column_name
, datum_diff_json
in six
.iteritems(row_diff
):
608 column
= table
.columns
.get(column_name
)
611 vlog
.warn("unknown column %s updating table %s"
612 % (column_name
, table
.name
))
616 datum_diff
= data
.Datum
.from_json(column
.type, datum_diff_json
)
617 except error
.Error
as e
:
619 vlog
.warn("error parsing column %s in table %s: %s"
620 % (column_name
, table
.name
, e
))
623 old_row
[column_name
] = row
._data
[column_name
].copy()
624 datum
= row
._data
[column_name
].diff(datum_diff
)
625 if datum
!= row
._data
[column_name
]:
626 row
._data
[column_name
] = datum
630 def __row_update(self
, table
, row
, row_json
):
632 for column_name
, datum_json
in six
.iteritems(row_json
):
633 column
= table
.columns
.get(column_name
)
636 vlog
.warn("unknown column %s updating table %s"
637 % (column_name
, table
.name
))
641 datum
= data
.Datum
.from_json(column
.type, datum_json
)
642 except error
.Error
as e
:
644 vlog
.warn("error parsing column %s in table %s: %s"
645 % (column_name
, table
.name
, e
))
648 if datum
!= row
._data
[column_name
]:
649 row
._data
[column_name
] = datum
653 # Didn't really change but the OVSDB monitor protocol always
654 # includes every value in a row.
658 def __create_row(self
, table
, uuid
):
660 for column
in six
.itervalues(table
.columns
):
661 data
[column
.name
] = ovs
.db
.data
.Datum
.default(column
.type)
662 return Row(self
, table
, uuid
, data
)
665 self
._session
.force_reconnect()
667 def __txn_abort_all(self
):
668 while self
._outstanding
_txns
:
669 txn
= self
._outstanding
_txns
.popitem()[1]
670 txn
._status
= Transaction
.TRY_AGAIN
672 def __txn_process_reply(self
, msg
):
673 txn
= self
._outstanding
_txns
.pop(msg
.id, None)
675 txn
._process
_reply
(msg
)
679 def _uuid_to_row(atom
, base
):
681 return base
.ref_table
.rows
.get(atom
)
686 def _row_to_uuid(value
):
687 if isinstance(value
, Row
):
693 @functools.total_ordering
695 """A row within an IDL.
697 The client may access the following attributes directly:
699 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
701 - An attribute for each column in the Row's table, named for the column,
702 whose values are as returned by Datum.to_python() for the column's type.
704 If some error occurs (e.g. the database server's idea of the column is
705 different from the IDL's idea), then the attribute values is the
706 "default" value return by Datum.default() for the column's type. (It is
707 important to know this because the default value may violate constraints
708 for the column's type, e.g. the default integer value is 0 even if column
709 contraints require the column's value to be positive.)
711 When a transaction is active, column attributes may also be assigned new
712 values. Committing the transaction will then cause the new value to be
713 stored into the database.
715 *NOTE*: In the current implementation, the value of a column is a *copy*
716 of the value in the database. This means that modifying its value
717 directly will have no useful effect. For example, the following:
718 row.mycolumn["a"] = "b" # don't do this
719 will not change anything in the database, even after commit. To modify
720 the column, instead assign the modified column value back to the column:
725 def __init__(self
, idl
, table
, uuid
, data
):
726 # All of the explicit references to self.__dict__ below are required
727 # to set real attributes with invoking self.__getattr__().
728 self
.__dict
__["uuid"] = uuid
730 self
.__dict
__["_idl"] = idl
731 self
.__dict
__["_table"] = table
733 # _data is the committed data. It takes the following values:
735 # - A dictionary that maps every column name to a Datum, if the row
736 # exists in the committed form of the database.
738 # - None, if this row is newly inserted within the active transaction
739 # and thus has no committed form.
740 self
.__dict
__["_data"] = data
742 # _changes describes changes to this row within the active transaction.
743 # It takes the following values:
745 # - {}, the empty dictionary, if no transaction is active or if the
746 # row has yet not been changed within this transaction.
748 # - A dictionary that maps a column name to its new Datum, if an
749 # active transaction changes those columns' values.
751 # - A dictionary that maps every column name to a Datum, if the row
752 # is newly inserted within the active transaction.
754 # - None, if this transaction deletes this row.
755 self
.__dict
__["_changes"] = {}
757 # _mutations describes changes to this row to be handled via a
758 # mutate operation on the wire. It takes the following values:
760 # - {}, the empty dictionary, if no transaction is active or if the
761 # row has yet not been mutated within this transaction.
763 # - A dictionary that contains two keys:
765 # - "_inserts" contains a dictionary that maps column names to
766 # new keys/key-value pairs that should be inserted into the
768 # - "_removes" contains a dictionary that maps column names to
769 # the keys/key-value pairs that should be removed from the
772 # - None, if this transaction deletes this row.
773 self
.__dict
__["_mutations"] = {}
775 # A dictionary whose keys are the names of columns that must be
776 # verified as prerequisites when the transaction commits. The values
777 # in the dictionary are all None.
778 self
.__dict
__["_prereqs"] = {}
780 def __lt__(self
, other
):
781 if not isinstance(other
, Row
):
782 return NotImplemented
783 return bool(self
.__dict
__['uuid'] < other
.__dict
__['uuid'])
785 def __eq__(self
, other
):
786 if not isinstance(other
, Row
):
787 return NotImplemented
788 return bool(self
.__dict
__['uuid'] == other
.__dict
__['uuid'])
791 return int(self
.__dict
__['uuid'])
793 def __getattr__(self
, column_name
):
794 assert self
._changes
is not None
795 assert self
._mutations
is not None
798 column
= self
._table
.columns
[column_name
]
800 raise AttributeError("%s instance has no attribute '%s'" %
801 (self
.__class
__.__name
__, column_name
))
802 datum
= self
._changes
.get(column_name
)
804 if '_inserts' in self
._mutations
.keys():
805 inserts
= self
._mutations
['_inserts'].get(column_name
)
807 if '_removes' in self
._mutations
.keys():
808 removes
= self
._mutations
['_removes'].get(column_name
)
810 if self
._data
is None:
812 raise AttributeError("%s instance has no attribute '%s'" %
813 (self
.__class
__.__name
__,
816 datum
= data
.Datum
.from_python(column
.type,
819 elif column_name
in self
._data
:
820 datum
= self
._data
[column_name
]
821 if column
.type.is_set():
822 dlist
= datum
.as_list()
823 if inserts
is not None:
824 dlist
.extend(list(inserts
))
825 if removes
is not None:
826 removes_datum
= data
.Datum
.from_python(column
.type,
829 removes_list
= removes_datum
.as_list()
830 dlist
= [x
for x
in dlist
if x
not in removes_list
]
831 datum
= data
.Datum
.from_python(column
.type, dlist
,
833 elif column
.type.is_map():
834 dmap
= datum
.to_python(_uuid_to_row
)
835 if inserts
is not None:
837 if removes
is not None:
839 if key
not in (inserts
or {}):
841 datum
= data
.Datum
.from_python(column
.type, dmap
,
845 raise AttributeError("%s instance has no attribute '%s'" %
846 (self
.__class
__.__name
__,
851 return datum
.to_python(_uuid_to_row
)
853 def __setattr__(self
, column_name
, value
):
854 assert self
._changes
is not None
857 if ((self
._table
.name
in self
._idl
.readonly
) and
858 (column_name
in self
._idl
.readonly
[self
._table
.name
])):
859 vlog
.warn("attempting to write to readonly column %s"
863 column
= self
._table
.columns
[column_name
]
865 datum
= data
.Datum
.from_python(column
.type, value
, _row_to_uuid
)
866 except error
.Error
as e
:
868 vlog
.err("attempting to write bad value to column %s (%s)"
871 # Remove prior version of the Row from the index if it has the indexed
872 # column set, and the column changing is an indexed column
873 if hasattr(self
, column_name
):
874 for idx
in self
._table
.rows
.indexes
.values():
875 if column_name
in (c
.column
for c
in idx
.columns
):
877 self
._idl
.txn
._write
(self
, column
, datum
)
878 for idx
in self
._table
.rows
.indexes
.values():
879 # Only update the index if indexed columns change
880 if column_name
in (c
.column
for c
in idx
.columns
):
883 def addvalue(self
, column_name
, key
):
884 self
._idl
.txn
._txn
_rows
[self
.uuid
] = self
885 column
= self
._table
.columns
[column_name
]
887 data
.Datum
.from_python(column
.type, key
, _row_to_uuid
)
888 except error
.Error
as e
:
890 vlog
.err("attempting to write bad value to column %s (%s)"
893 inserts
= self
._mutations
.setdefault('_inserts', {})
894 column_value
= inserts
.setdefault(column_name
, set())
895 column_value
.add(key
)
897 def delvalue(self
, column_name
, key
):
898 self
._idl
.txn
._txn
_rows
[self
.uuid
] = self
899 column
= self
._table
.columns
[column_name
]
901 data
.Datum
.from_python(column
.type, key
, _row_to_uuid
)
902 except error
.Error
as e
:
904 vlog
.err("attempting to delete bad value from column %s (%s)"
907 removes
= self
._mutations
.setdefault('_removes', {})
908 column_value
= removes
.setdefault(column_name
, set())
909 column_value
.add(key
)
911 def setkey(self
, column_name
, key
, value
):
912 self
._idl
.txn
._txn
_rows
[self
.uuid
] = self
913 column
= self
._table
.columns
[column_name
]
915 data
.Datum
.from_python(column
.type, {key
: value
}, _row_to_uuid
)
916 except error
.Error
as e
:
918 vlog
.err("attempting to write bad value to column %s (%s)"
921 if self
._data
and column_name
in self
._data
:
922 # Remove existing key/value before updating.
923 removes
= self
._mutations
.setdefault('_removes', {})
924 column_value
= removes
.setdefault(column_name
, set())
925 column_value
.add(key
)
926 inserts
= self
._mutations
.setdefault('_inserts', {})
927 column_value
= inserts
.setdefault(column_name
, {})
928 column_value
[key
] = value
930 def delkey(self
, column_name
, key
, value
=None):
931 self
._idl
.txn
._txn
_rows
[self
.uuid
] = self
934 old_value
= data
.Datum
.to_python(self
._data
[column_name
],
938 if key
not in old_value
:
940 if old_value
[key
] != value
:
942 removes
= self
._mutations
.setdefault('_removes', {})
943 column_value
= removes
.setdefault(column_name
, set())
944 column_value
.add(key
)
948 def from_json(cls
, idl
, table
, uuid
, row_json
):
950 for column_name
, datum_json
in six
.iteritems(row_json
):
951 column
= table
.columns
.get(column_name
)
954 vlog
.warn("unknown column %s in table %s"
955 % (column_name
, table
.name
))
958 datum
= ovs
.db
.data
.Datum
.from_json(column
.type, datum_json
)
959 except error
.Error
as e
:
961 vlog
.warn("error parsing column %s in table %s: %s"
962 % (column_name
, table
.name
, e
))
964 data
[column_name
] = datum
965 return cls(idl
, table
, uuid
, data
)
967 def verify(self
, column_name
):
968 """Causes the original contents of column 'column_name' in this row to
969 be verified as a prerequisite to completing the transaction. That is,
970 if 'column_name' changed in this row (or if this row was deleted)
971 between the time that the IDL originally read its contents and the time
972 that the transaction commits, then the transaction aborts and
973 Transaction.commit() returns Transaction.TRY_AGAIN.
975 The intention is that, to ensure that no transaction commits based on
976 dirty reads, an application should call Row.verify() on each data item
977 read as part of a read-modify-write operation.
979 In some cases Row.verify() reduces to a no-op, because the current
980 value of the column is already known:
982 - If this row is a row created by the current transaction (returned
983 by Transaction.insert()).
985 - If the column has already been modified within the current
988 Because of the latter property, always call Row.verify() *before*
989 modifying the column, for a given read-modify-write.
991 A transaction must be in progress."""
993 assert self
._changes
is not None
994 if not self
._data
or column_name
in self
._changes
:
997 self
._prereqs
[column_name
] = None
1000 """Deletes this row from its table.
1002 A transaction must be in progress."""
1003 assert self
._idl
.txn
1004 assert self
._changes
is not None
1005 if self
._data
is None:
1006 del self
._idl
.txn
._txn
_rows
[self
.uuid
]
1008 self
._idl
.txn
._txn
_rows
[self
.uuid
] = self
1009 del self
._table
.rows
[self
.uuid
]
1010 self
.__dict
__["_changes"] = None
1012 def fetch(self
, column_name
):
1013 self
._idl
.txn
._fetch
(self
, column_name
)
1015 def increment(self
, column_name
):
1016 """Causes the transaction, when committed, to increment the value of
1017 'column_name' within this row by 1. 'column_name' must have an integer
1018 type. After the transaction commits successfully, the client may
1019 retrieve the final (incremented) value of 'column_name' with
1020 Transaction.get_increment_new_value().
1022 The client could accomplish something similar by reading and writing
1023 and verify()ing columns. However, increment() will never (by itself)
1024 cause a transaction to fail because of a verify error.
1026 The intended use is for incrementing the "next_cfg" column in
1027 the Open_vSwitch table."""
1028 self
._idl
.txn
._increment
(self
, column_name
)
1031 def _uuid_name_from_uuid(uuid
):
1032 return "row%s" % str(uuid
).replace("-", "_")
1035 def _where_uuid_equals(uuid
):
1036 return [["_uuid", "==", ["uuid", str(uuid
)]]]
1039 class _InsertedRow(object):
1040 def __init__(self
, op_index
):
1041 self
.op_index
= op_index
1045 class Transaction(object):
1046 """A transaction may modify the contents of a database by modifying the
1047 values of columns, deleting rows, inserting rows, or adding checks that
1048 columns in the database have not changed ("verify" operations), through
1051 Reading and writing columns and inserting and deleting rows are all
1052 straightforward. The reasons to verify columns are less obvious.
1053 Verification is the key to maintaining transactional integrity. Because
1054 OVSDB handles multiple clients, it can happen that between the time that
1055 OVSDB client A reads a column and writes a new value, OVSDB client B has
1056 written that column. Client A's write should not ordinarily overwrite
1057 client B's, especially if the column in question is a "map" column that
1058 contains several more or less independent data items. If client A adds a
1059 "verify" operation before it writes the column, then the transaction fails
1060 in case client B modifies it first. Client A will then see the new value
1061 of the column and compose a new transaction based on the new contents
1062 written by client B.
1064 When a transaction is complete, which must be before the next call to
1065 Idl.run(), call Transaction.commit() or Transaction.abort().
1067 The life-cycle of a transaction looks like this:
1069 1. Create the transaction and record the initial sequence number:
1071 seqno = idl.change_seqno(idl)
1072 txn = Transaction(idl)
1074 2. Modify the database with Row and Transaction methods.
1076 3. Commit the transaction by calling Transaction.commit(). The first call
1077 to this function probably returns Transaction.INCOMPLETE. The client
1078 must keep calling again along as this remains true, calling Idl.run() in
1079 between to let the IDL do protocol processing. (If the client doesn't
1080 have anything else to do in the meantime, it can use
1081 Transaction.commit_block() to avoid having to loop itself.)
1083 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
1084 to change from the saved 'seqno' (it's possible that it's already
1085 changed, in which case the client should not wait at all), then start
1086 over from step 1. Only a call to Idl.run() will change the return value
1087 of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)"""
1089 # Status values that Transaction.commit() can return.
1091 # Not yet committed or aborted.
1092 UNCOMMITTED
= "uncommitted"
1093 # Transaction didn't include any changes.
1094 UNCHANGED
= "unchanged"
1095 # Commit in progress, please wait.
1096 INCOMPLETE
= "incomplete"
1097 # ovsdb_idl_txn_abort() called.
1099 # Commit successful.
1101 # Commit failed because a "verify" operation
1102 # reported an inconsistency, due to a network
1103 # problem, or other transient failure. Wait
1104 # for a change, then try again.
1105 TRY_AGAIN
= "try again"
1106 # Server hasn't given us the lock yet.
1107 NOT_LOCKED
= "not locked"
1108 # Commit failed due to a hard error.
1112 def status_to_string(status
):
1113 """Converts one of the status values that Transaction.commit() can
1114 return into a human-readable string.
1116 (The status values are in fact such strings already, so
1117 there's nothing to do.)"""
1120 def __init__(self
, idl
):
1121 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
1122 A given Idl may only have a single active transaction at a time.
1124 A Transaction may modify the contents of a database by assigning new
1125 values to columns (attributes of Row), deleting rows (with
1126 Row.delete()), or inserting rows (with Transaction.insert()). It may
1127 also check that columns in the database have not changed with
1130 When a transaction is complete (which must be before the next call to
1131 Idl.run()), call Transaction.commit() or Transaction.abort()."""
1132 assert idl
.txn
is None
1135 self
._request
_id
= None
1137 self
.dry_run
= False
1139 self
._status
= Transaction
.UNCOMMITTED
1143 self
._inc
_row
= None
1144 self
._inc
_column
= None
1146 self
._fetch
_requests
= []
1148 self
._inserted
_rows
= {} # Map from UUID to _InsertedRow
1150 def add_comment(self
, comment
):
1151 """Appends 'comment' to the comments that will be passed to the OVSDB
1152 server when this transaction is committed. (The comment will be
1153 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
1154 relatively human-readable form.)"""
1155 self
._comments
.append(comment
)
1157 def wait(self
, poller
):
1158 """Causes poll_block() to wake up if this transaction has completed
1160 if self
._status
not in (Transaction
.UNCOMMITTED
,
1161 Transaction
.INCOMPLETE
):
1162 poller
.immediate_wake()
1164 def _substitute_uuids(self
, json
):
1165 if isinstance(json
, (list, tuple)):
1167 and json
[0] == 'uuid'
1168 and ovs
.ovsuuid
.is_valid_string(json
[1])):
1169 uuid
= ovs
.ovsuuid
.from_string(json
[1])
1170 row
= self
._txn
_rows
.get(uuid
, None)
1171 if row
and row
._data
is None:
1172 return ["named-uuid", _uuid_name_from_uuid(uuid
)]
1174 return [self
._substitute
_uuids
(elem
) for elem
in json
]
1177 def __disassemble(self
):
1180 for row
in six
.itervalues(self
._txn
_rows
):
1181 if row
._changes
is None:
1182 # If we add the deleted row back to rows with _changes == None
1183 # then __getattr__ will not work for the indexes
1184 row
.__dict
__["_changes"] = {}
1185 row
.__dict
__["_mutations"] = {}
1186 row
._table
.rows
[row
.uuid
] = row
1187 elif row
._data
is None:
1188 del row
._table
.rows
[row
.uuid
]
1189 row
.__dict
__["_changes"] = {}
1190 row
.__dict
__["_mutations"] = {}
1191 row
.__dict
__["_prereqs"] = {}
1195 """Attempts to commit 'txn'. Returns the status of the commit
1196 operation, one of the following constants:
1198 Transaction.INCOMPLETE:
1200 The transaction is in progress, but not yet complete. The caller
1201 should call again later, after calling Idl.run() to let the
1202 IDL do OVSDB protocol processing.
1204 Transaction.UNCHANGED:
1206 The transaction is complete. (It didn't actually change the
1207 database, so the IDL didn't send any request to the database
1210 Transaction.ABORTED:
1212 The caller previously called Transaction.abort().
1214 Transaction.SUCCESS:
1216 The transaction was successful. The update made by the
1217 transaction (and possibly other changes made by other database
1218 clients) should already be visible in the IDL.
1220 Transaction.TRY_AGAIN:
1222 The transaction failed for some transient reason, e.g. because a
1223 "verify" operation reported an inconsistency or due to a network
1224 problem. The caller should wait for a change to the database,
1225 then compose a new transaction, and commit the new transaction.
1227 Use Idl.change_seqno to wait for a change in the database. It is
1228 important to use its value *before* the initial call to
1229 Transaction.commit() as the baseline for this purpose, because
1230 the change that one should wait for can happen after the initial
1231 call but before the call that returns Transaction.TRY_AGAIN, and
1232 using some other baseline value in that situation could cause an
1233 indefinite wait if the database rarely changes.
1235 Transaction.NOT_LOCKED:
1237 The transaction failed because the IDL has been configured to
1238 require a database lock (with Idl.set_lock()) but didn't
1239 get it yet or has already lost it.
1241 Committing a transaction rolls back all of the changes that it made to
1242 the IDL's copy of the database. If the transaction commits
1243 successfully, then the database server will send an update and, thus,
1244 the IDL will be updated with the committed changes."""
1245 # The status can only change if we're the active transaction.
1246 # (Otherwise, our status will change only in Idl.run().)
1247 if self
!= self
.idl
.txn
:
1250 # If we need a lock but don't have it, give up quickly.
1251 if self
.idl
.lock_name
and not self
.idl
.has_lock
:
1252 self
._status
= Transaction
.NOT_LOCKED
1253 self
.__disassemble
()
1256 operations
= [self
.idl
._db
.name
]
1258 # Assert that we have the required lock (avoiding a race).
1259 if self
.idl
.lock_name
:
1260 operations
.append({"op": "assert",
1261 "lock": self
.idl
.lock_name
})
1263 # Add prerequisites and declarations of new rows.
1264 for row
in six
.itervalues(self
._txn
_rows
):
1268 for column_name
in row
._prereqs
:
1269 columns
.append(column_name
)
1270 rows
[column_name
] = row
._data
[column_name
].to_json()
1271 operations
.append({"op": "wait",
1272 "table": row
._table
.name
,
1274 "where": _where_uuid_equals(row
.uuid
),
1281 for row
in six
.itervalues(self
._txn
_rows
):
1282 if row
._changes
is None:
1283 if row
._table
.is_root
:
1284 operations
.append({"op": "delete",
1285 "table": row
._table
.name
,
1286 "where": _where_uuid_equals(row
.uuid
)})
1289 # Let ovsdb-server decide whether to really delete it.
1292 op
= {"table": row
._table
.name
}
1293 if row
._data
is None:
1295 op
["uuid-name"] = _uuid_name_from_uuid(row
.uuid
)
1298 op_index
= len(operations
) - 1
1299 self
._inserted
_rows
[row
.uuid
] = _InsertedRow(op_index
)
1302 op
["where"] = _where_uuid_equals(row
.uuid
)
1305 op
["row"] = row_json
1307 for column_name
, datum
in six
.iteritems(row
._changes
):
1308 if row
._data
is not None or not datum
.is_default():
1309 row_json
[column_name
] = (
1310 self
._substitute
_uuids
(datum
.to_json()))
1312 # If anything really changed, consider it an update.
1313 # We can't suppress not-really-changed values earlier
1314 # or transactions would become nonatomic (see the big
1315 # comment inside Transaction._write()).
1316 if (not any_updates
and row
._data
is not None and
1317 row
._data
[column_name
] != datum
):
1320 if row
._data
is None or row_json
:
1321 operations
.append(op
)
1324 op
= {"table": row
._table
.name
}
1326 if row
._data
is None:
1328 op
["where"] = self
._substitute
_uuids
(
1329 _where_uuid_equals(row
.uuid
))
1332 op
["where"] = _where_uuid_equals(row
.uuid
)
1333 op
["mutations"] = []
1334 if '_removes' in row
._mutations
.keys():
1335 for col
, dat
in six
.iteritems(row
._mutations
['_removes']):
1336 column
= row
._table
.columns
[col
]
1337 if column
.type.is_map():
1339 opdat
.append(list(dat
))
1345 datum
= data
.Datum
.from_python(column
.type,
1350 self
._substitute
_uuids
(datum
.to_json()))
1351 opdat
.append(inner_opdat
)
1352 mutation
= [col
, "delete", opdat
]
1353 op
["mutations"].append(mutation
)
1355 if '_inserts' in row
._mutations
.keys():
1356 for col
, val
in six
.iteritems(row
._mutations
['_inserts']):
1357 column
= row
._table
.columns
[col
]
1358 if column
.type.is_map():
1360 datum
= data
.Datum
.from_python(column
.type, val
,
1362 opdat
.append(datum
.as_list())
1368 datum
= data
.Datum
.from_python(column
.type,
1373 self
._substitute
_uuids
(datum
.to_json()))
1374 opdat
.append(inner_opdat
)
1375 mutation
= [col
, "insert", opdat
]
1376 op
["mutations"].append(mutation
)
1379 operations
.append(op
)
1382 if self
._fetch
_requests
:
1383 for fetch
in self
._fetch
_requests
:
1384 fetch
["index"] = len(operations
) - 1
1385 operations
.append({"op": "select",
1386 "table": fetch
["row"]._table
.name
,
1387 "where": self
._substitute
_uuids
(
1388 _where_uuid_equals(fetch
["row"].uuid
)),
1389 "columns": [fetch
["column_name"]]})
1393 if self
._inc
_row
and any_updates
:
1394 self
._inc
_index
= len(operations
) - 1
1396 operations
.append({"op": "mutate",
1397 "table": self
._inc
_row
._table
.name
,
1398 "where": self
._substitute
_uuids
(
1399 _where_uuid_equals(self
._inc
_row
.uuid
)),
1400 "mutations": [[self
._inc
_column
, "+=", 1]]})
1401 operations
.append({"op": "select",
1402 "table": self
._inc
_row
._table
.name
,
1403 "where": self
._substitute
_uuids
(
1404 _where_uuid_equals(self
._inc
_row
.uuid
)),
1405 "columns": [self
._inc
_column
]})
1409 operations
.append({"op": "comment",
1410 "comment": "\n".join(self
._comments
)})
1414 operations
.append({"op": "abort"})
1417 self
._status
= Transaction
.UNCHANGED
1419 msg
= ovs
.jsonrpc
.Message
.create_request("transact", operations
)
1420 self
._request
_id
= msg
.id
1421 if not self
.idl
._session
.send(msg
):
1422 self
.idl
._outstanding
_txns
[self
._request
_id
] = self
1423 self
._status
= Transaction
.INCOMPLETE
1425 self
._status
= Transaction
.TRY_AGAIN
1427 self
.__disassemble
()
1430 def commit_block(self
):
1431 """Attempts to commit this transaction, blocking until the commit
1432 either succeeds or fails. Returns the final commit status, which may
1433 be any Transaction.* value other than Transaction.INCOMPLETE.
1435 This function calls Idl.run() on this transaction'ss IDL, so it may
1436 cause Idl.change_seqno to change."""
1438 status
= self
.commit()
1439 if status
!= Transaction
.INCOMPLETE
:
1444 poller
= ovs
.poller
.Poller()
1445 self
.idl
.wait(poller
)
1449 def get_increment_new_value(self
):
1450 """Returns the final (incremented) value of the column in this
1451 transaction that was set to be incremented by Row.increment. This
1452 transaction must have committed successfully."""
1453 assert self
._status
== Transaction
.SUCCESS
1454 return self
._inc
_new
_value
1457 """Aborts this transaction. If Transaction.commit() has already been
1458 called then the transaction might get committed anyhow."""
1459 self
.__disassemble
()
1460 if self
._status
in (Transaction
.UNCOMMITTED
,
1461 Transaction
.INCOMPLETE
):
1462 self
._status
= Transaction
.ABORTED
1464 def get_error(self
):
1465 """Returns a string representing this transaction's current status,
1466 suitable for use in log messages."""
1467 if self
._status
!= Transaction
.ERROR
:
1468 return Transaction
.status_to_string(self
._status
)
1472 return "no error details available"
1474 def __set_error_json(self
, json
):
1475 if self
._error
is None:
1476 self
._error
= ovs
.json
.to_string(json
)
1478 def get_insert_uuid(self
, uuid
):
1479 """Finds and returns the permanent UUID that the database assigned to a
1480 newly inserted row, given the UUID that Transaction.insert() assigned
1481 locally to that row.
1483 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1484 or if it was assigned by that function and then deleted by Row.delete()
1485 within the same transaction. (Rows that are inserted and then deleted
1486 within a single transaction are never sent to the database server, so
1487 it never assigns them a permanent UUID.)
1489 This transaction must have completed successfully."""
1490 assert self
._status
in (Transaction
.SUCCESS
,
1491 Transaction
.UNCHANGED
)
1492 inserted_row
= self
._inserted
_rows
.get(uuid
)
1494 return inserted_row
.real
1497 def _increment(self
, row
, column
):
1498 assert not self
._inc
_row
1500 self
._inc
_column
= column
1502 def _fetch(self
, row
, column_name
):
1503 self
._fetch
_requests
.append({"row": row
, "column_name": column_name
})
1505 def _write(self
, row
, column
, datum
):
1506 assert row
._changes
is not None
1507 assert row
._mutations
is not None
1511 # If this is a write-only column and the datum being written is the
1512 # same as the one already there, just skip the update entirely. This
1513 # is worth optimizing because we have a lot of columns that get
1514 # periodically refreshed into the database but don't actually change
1517 # We don't do this for read/write columns because that would break
1518 # atomicity of transactions--some other client might have written a
1519 # different value in that column since we read it. (But if a whole
1520 # transaction only does writes of existing values, without making any
1521 # real changes, we will drop the whole transaction later in
1522 # ovsdb_idl_txn_commit().)
1523 if (not column
.alert
and row
._data
and
1524 row
._data
.get(column
.name
) == datum
):
1525 new_value
= row
._changes
.get(column
.name
)
1526 if new_value
is None or new_value
== datum
:
1529 txn
._txn
_rows
[row
.uuid
] = row
1530 if '_inserts' in row
._mutations
:
1531 row
._mutations
['_inserts'].pop(column
.name
, None)
1532 if '_removes' in row
._mutations
:
1533 row
._mutations
['_removes'].pop(column
.name
, None)
1534 row
._changes
[column
.name
] = datum
.copy()
1536 def insert(self
, table
, new_uuid
=None):
1537 """Inserts and returns a new row in 'table', which must be one of the
1538 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1540 The new row is assigned a provisional UUID. If 'uuid' is None then one
1541 is randomly generated; otherwise 'uuid' should specify a randomly
1542 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
1543 different UUID when 'txn' is committed, but the IDL will replace any
1544 uses of the provisional UUID in the data to be to be committed by the
1545 UUID assigned by ovsdb-server."""
1546 assert self
._status
== Transaction
.UNCOMMITTED
1547 if new_uuid
is None:
1548 new_uuid
= uuid
.uuid4()
1549 row
= Row(self
.idl
, table
, new_uuid
, None)
1550 table
.rows
[row
.uuid
] = row
1551 self
._txn
_rows
[row
.uuid
] = row
1554 def _process_reply(self
, msg
):
1555 if msg
.type == ovs
.jsonrpc
.Message
.T_ERROR
:
1556 self
._status
= Transaction
.ERROR
1557 elif not isinstance(msg
.result
, (list, tuple)):
1559 vlog
.warn('reply to "transact" is not JSON array')
1568 # This isn't an error in itself but indicates that some
1569 # prior operation failed, so make sure that we know about
1572 elif isinstance(op
, dict):
1573 error
= op
.get("error")
1574 if error
is not None:
1575 if error
== "timed out":
1577 elif error
== "not owner":
1579 elif error
== "aborted":
1583 self
.__set
_error
_json
(op
)
1586 self
.__set
_error
_json
(op
)
1588 vlog
.warn("operation reply is not JSON null or object")
1590 if not soft_errors
and not hard_errors
and not lock_errors
:
1591 if self
._inc
_row
and not self
.__process
_inc
_reply
(ops
):
1593 if self
._fetch
_requests
:
1594 if self
.__process
_fetch
_reply
(ops
):
1595 self
.idl
.change_seqno
+= 1
1599 for insert
in six
.itervalues(self
._inserted
_rows
):
1600 if not self
.__process
_insert
_reply
(insert
, ops
):
1604 self
._status
= Transaction
.ERROR
1606 self
._status
= Transaction
.NOT_LOCKED
1608 self
._status
= Transaction
.TRY_AGAIN
1610 self
._status
= Transaction
.SUCCESS
1613 def __check_json_type(json
, types
, name
):
1616 vlog
.warn("%s is missing" % name
)
1618 elif not isinstance(json
, tuple(types
)):
1620 vlog
.warn("%s has unexpected type %s" % (name
, type(json
)))
1625 def __process_fetch_reply(self
, ops
):
1627 for fetch_request
in self
._fetch
_requests
:
1628 row
= fetch_request
["row"]
1629 column_name
= fetch_request
["column_name"]
1630 index
= fetch_request
["index"]
1634 fetched_rows
= select
.get("rows")
1635 if not Transaction
.__check
_json
_type
(fetched_rows
, (list, tuple),
1636 '"select" reply "rows"'):
1638 if len(fetched_rows
) != 1:
1640 vlog
.warn('"select" reply "rows" has %d elements '
1641 'instead of 1' % len(fetched_rows
))
1643 fetched_row
= fetched_rows
[0]
1644 if not Transaction
.__check
_json
_type
(fetched_row
, (dict,),
1645 '"select" reply row'):
1648 column
= table
.columns
.get(column_name
)
1649 datum_json
= fetched_row
.get(column_name
)
1650 datum
= data
.Datum
.from_json(column
.type, datum_json
)
1652 row
._data
[column_name
] = datum
1657 def __process_inc_reply(self
, ops
):
1658 if self
._inc
_index
+ 2 > len(ops
):
1660 vlog
.warn("reply does not contain enough operations for "
1661 "increment (has %d, needs %d)" %
1662 (len(ops
), self
._inc
_index
+ 2))
1664 # We know that this is a JSON object because the loop in
1665 # __process_reply() already checked.
1666 mutate
= ops
[self
._inc
_index
]
1667 count
= mutate
.get("count")
1668 if not Transaction
.__check
_json
_type
(count
, six
.integer_types
,
1669 '"mutate" reply "count"'):
1673 vlog
.warn('"mutate" reply "count" is %d instead of 1' % count
)
1676 select
= ops
[self
._inc
_index
+ 1]
1677 rows
= select
.get("rows")
1678 if not Transaction
.__check
_json
_type
(rows
, (list, tuple),
1679 '"select" reply "rows"'):
1683 vlog
.warn('"select" reply "rows" has %d elements '
1684 'instead of 1' % len(rows
))
1687 if not Transaction
.__check
_json
_type
(row
, (dict,),
1688 '"select" reply row'):
1690 column
= row
.get(self
._inc
_column
)
1691 if not Transaction
.__check
_json
_type
(column
, six
.integer_types
,
1692 '"select" reply inc column'):
1694 self
._inc
_new
_value
= column
1697 def __process_insert_reply(self
, insert
, ops
):
1698 if insert
.op_index
>= len(ops
):
1700 vlog
.warn("reply does not contain enough operations "
1701 "for insert (has %d, needs %d)"
1702 % (len(ops
), insert
.op_index
))
1705 # We know that this is a JSON object because the loop in
1706 # __process_reply() already checked.
1707 reply
= ops
[insert
.op_index
]
1708 json_uuid
= reply
.get("uuid")
1709 if not Transaction
.__check
_json
_type
(json_uuid
, (tuple, list),
1710 '"insert" reply "uuid"'):
1714 uuid_
= ovs
.ovsuuid
.from_json(json_uuid
)
1717 vlog
.warn('"insert" reply "uuid" is not a JSON UUID')
1724 class SchemaHelper(object):
1725 """IDL Schema helper.
1727 This class encapsulates the logic required to generate schemas suitable
1728 for creating 'ovs.db.idl.Idl' objects. Clients should register columns
1729 they are interested in using register_columns(). When finished, the
1730 get_idl_schema() function may be called.
1732 The location on disk of the schema used may be found in the
1733 'schema_location' variable."""
1735 def __init__(self
, location
=None, schema_json
=None):
1736 """Creates a new Schema object.
1738 'location' file path to ovs schema. None means default location
1739 'schema_json' schema in json preresentation in memory
1742 if location
and schema_json
:
1743 raise ValueError("both location and schema_json can't be "
1744 "specified. it's ambiguous.")
1745 if schema_json
is None:
1746 if location
is None:
1747 location
= "%s/vswitch.ovsschema" % ovs
.dirs
.PKGDATADIR
1748 schema_json
= ovs
.json
.from_file(location
)
1750 self
.schema_json
= schema_json
1755 def register_columns(self
, table
, columns
, readonly
=[]):
1756 """Registers interest in the given 'columns' of 'table'. Future calls
1757 to get_idl_schema() will include 'table':column for each column in
1758 'columns'. This function automatically avoids adding duplicate entries
1760 A subset of 'columns' can be specified as 'readonly'. The readonly
1761 columns are not replicated but can be fetched on-demand by the user
1764 'table' must be a string.
1765 'columns' must be a list of strings.
1766 'readonly' must be a list of strings.
1769 assert isinstance(table
, six
.string_types
)
1770 assert isinstance(columns
, list)
1772 columns
= set(columns
) | self
._tables
.get(table
, set())
1773 self
._tables
[table
] = columns
1774 self
._readonly
[table
] = readonly
1776 def register_table(self
, table
):
1777 """Registers interest in the given all columns of 'table'. Future calls
1778 to get_idl_schema() will include all columns of 'table'.
1780 'table' must be a string
1782 assert isinstance(table
, six
.string_types
)
1783 self
._tables
[table
] = set() # empty set means all columns in the table
1785 def register_all(self
):
1786 """Registers interest in every column of every table."""
1789 def get_idl_schema(self
):
1790 """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1791 object based on columns registered using the register_columns()
1794 schema
= ovs
.db
.schema
.DbSchema
.from_json(self
.schema_json
)
1795 self
.schema_json
= None
1799 for table
, columns
in six
.iteritems(self
._tables
):
1800 schema_tables
[table
] = (
1801 self
._keep
_table
_columns
(schema
, table
, columns
))
1803 schema
.tables
= schema_tables
1804 schema
.readonly
= self
._readonly
1807 def _keep_table_columns(self
, schema
, table_name
, columns
):
1808 assert table_name
in schema
.tables
1809 table
= schema
.tables
[table_name
]
1812 # empty set means all columns in the table
1816 for column_name
in columns
:
1817 assert isinstance(column_name
, six
.string_types
)
1818 assert column_name
in table
.columns
1820 new_columns
[column_name
] = table
.columns
[column_name
]
1822 table
.columns
= new_columns