]> git.proxmox.com Git - mirror_ovs.git/blame - python/ovs/db/idl.py
ovsdb-idl: Use modern form of <monitor-requests>.
[mirror_ovs.git] / python / ovs / db / idl.py
CommitLineData
0164e367 1# Copyright (c) 2009, 2010, 2011, 2012, 2013, 2016 Nicira, Inc.
99155935
BP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at:
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
fbafc3c2 15import functools
8cdf0349 16import uuid
99155935 17
a59912a0 18import ovs.db.data as data
4c0f6271 19import ovs.db.parser
99155935 20import ovs.db.schema
6c7050b5 21import ovs.jsonrpc
99155935 22import ovs.ovsuuid
8cdf0349 23import ovs.poller
3a656eaf 24import ovs.vlog
6c7050b5 25from ovs.db import error
26
27import six
3a656eaf
EJ
28
29vlog = ovs.vlog.Vlog("idl")
99155935 30
26bb0f31
EJ
31__pychecker__ = 'no-classattr no-objattrs'
32
d7d417fc
TW
33ROW_CREATE = "create"
34ROW_UPDATE = "update"
35ROW_DELETE = "delete"
26bb0f31 36
897c8064
LS
37OVSDB_UPDATE = 0
38OVSDB_UPDATE2 = 1
39
d7d417fc
TW
40
41class Idl(object):
99155935
BP
42 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
43
44 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
45 requests to an OVSDB database server and parses the responses, converting
46 raw JSON into data structures that are easier for clients to digest.
47
48 The IDL also assists with issuing database transactions. The client
49 creates a transaction, manipulates the IDL data structures, and commits or
50 aborts the transaction. The IDL then composes and issues the necessary
51 JSON-RPC requests and reports to the client whether the transaction
52 completed successfully.
53
8cdf0349
BP
54 The client is allowed to access the following attributes directly, in a
55 read-only fashion:
99155935 56
8cdf0349
BP
57 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
58 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
59 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
60 to a Row object.
61
62 The client may directly read and write the Row objects referenced by the
63 'rows' map values. Refer to Row for more details.
64
65 - 'change_seqno': A number that represents the IDL's state. When the IDL
2f926787
BP
66 is updated (by Idl.run()), its value changes. The sequence number can
67 occasionally change even if the database does not. This happens if the
68 connection to the database drops and reconnects, which causes the
69 database contents to be reloaded even if they didn't change. (It could
70 also happen if the database server sends out a "change" that reflects
71 what the IDL already thought was in the database. The database server is
72 not supposed to do that, but bugs could in theory cause it to do so.)
8cdf0349
BP
73
74 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
75 if no lock is configured.
76
77 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
78 lock, and False otherwise.
79
80 Locking and unlocking happens asynchronously from the database client's
81 point of view, so the information is only useful for optimization
82 (e.g. if the client doesn't have the lock then there's no point in trying
83 to write to the database).
84
85 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
86 the database server has indicated that some other client already owns the
87 requested lock, and False otherwise.
88
89 - 'txn': The ovs.db.idl.Transaction object for the database transaction
90 currently being constructed, if there is one, or None otherwise.
91"""
92
897c8064
LS
93 IDL_S_INITIAL = 0
94 IDL_S_MONITOR_REQUESTED = 1
95 IDL_S_MONITOR_COND_REQUESTED = 2
96
f73d562f 97 def __init__(self, remote, schema, probe_interval=None):
99155935
BP
98 """Creates and returns a connection to the database named 'db_name' on
99 'remote', which should be in a form acceptable to
100 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
8cdf0349
BP
101 replica of the remote database.
102
103 'schema' should be the schema for the remote database. The caller may
104 have cut it down by removing tables or columns that are not of
105 interest. The IDL will only replicate the tables and columns that
106 remain. The caller may also add a attribute named 'alert' to selected
107 remaining columns, setting its value to False; if so, then changes to
108 those columns will not be considered changes to the database for the
109 purpose of the return value of Idl.run() and Idl.change_seqno. This is
110 useful for columns that the IDL's client will write but not read.
111
ad0991e6
EJ
112 As a convenience to users, 'schema' may also be an instance of the
113 SchemaHelper class.
114
f73d562f
LAG
115 The IDL uses and modifies 'schema' directly.
116
117 If "probe_interval" is zero it disables the connection keepalive
118 feature. If non-zero the value will be forced to at least 1000
119 milliseconds. If None it will just use the default value in OVS.
120 """
8cdf0349 121
bf42f674
EJ
122 assert isinstance(schema, SchemaHelper)
123 schema = schema.get_idl_schema()
ad0991e6 124
8cdf0349 125 self.tables = schema.tables
80c12152 126 self.readonly = schema.readonly
8cdf0349 127 self._db = schema
f73d562f
LAG
128 self._session = ovs.jsonrpc.Session.open(remote,
129 probe_interval=probe_interval)
8cdf0349
BP
130 self._monitor_request_id = None
131 self._last_seqno = None
99155935 132 self.change_seqno = 0
897c8064
LS
133 self.uuid = uuid.uuid1()
134 self.state = self.IDL_S_INITIAL
8cdf0349
BP
135
136 # Database locking.
137 self.lock_name = None # Name of lock we need, None if none.
138 self.has_lock = False # Has db server said we have the lock?
26bb0f31 139 self.is_lock_contended = False # Has db server said we can't get lock?
8cdf0349
BP
140 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
141
142 # Transaction support.
143 self.txn = None
144 self._outstanding_txns = {}
145
cb96c1b2
RB
146 for table in six.itervalues(schema.tables):
147 for column in six.itervalues(table.columns):
8cdf0349
BP
148 if not hasattr(column, 'alert'):
149 column.alert = True
150 table.need_table = False
151 table.rows = {}
152 table.idl = self
0164e367 153 table.condition = [True]
16ebb90e 154 table.cond_changed = False
99155935
BP
155
156 def close(self):
8cdf0349
BP
157 """Closes the connection to the database. The IDL will no longer
158 update."""
159 self._session.close()
99155935
BP
160
161 def run(self):
162 """Processes a batch of messages from the database server. Returns
163 True if the database as seen through the IDL changed, False if it did
164 not change. The initial fetch of the entire contents of the remote
8cdf0349
BP
165 database is considered to be one kind of change. If the IDL has been
166 configured to acquire a database lock (with Idl.set_lock()), then
167 successfully acquiring the lock is also considered to be a change.
99155935
BP
168
169 This function can return occasional false positives, that is, report
170 that the database changed even though it didn't. This happens if the
171 connection to the database drops and reconnects, which causes the
172 database contents to be reloaded even if they didn't change. (It could
173 also happen if the database server sends out a "change" that reflects
174 what we already thought was in the database, but the database server is
175 not supposed to do that.)
176
177 As an alternative to checking the return value, the client may check
8cdf0349
BP
178 for changes in self.change_seqno."""
179 assert not self.txn
99155935 180 initial_change_seqno = self.change_seqno
16ebb90e
LS
181
182 self.send_cond_change()
8cdf0349
BP
183 self._session.run()
184 i = 0
185 while i < 50:
186 i += 1
187 if not self._session.is_connected():
188 break
189
190 seqno = self._session.get_seqno()
191 if seqno != self._last_seqno:
192 self._last_seqno = seqno
193 self.__txn_abort_all()
194 self.__send_monitor_request()
195 if self.lock_name:
196 self.__send_lock_request()
197 break
198
199 msg = self._session.recv()
200 if msg is None:
201 break
202 if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
897c8064
LS
203 and msg.method == "update2"
204 and len(msg.params) == 2):
205 # Database contents changed.
206 self.__parse_update(msg.params[1], OVSDB_UPDATE2)
207 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
208 and msg.method == "update"
209 and len(msg.params) == 2):
8cdf0349 210 # Database contents changed.
897c8064 211 self.__parse_update(msg.params[1], OVSDB_UPDATE)
8cdf0349
BP
212 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
213 and self._monitor_request_id is not None
214 and self._monitor_request_id == msg.id):
215 # Reply to our "monitor" request.
216 try:
217 self.change_seqno += 1
218 self._monitor_request_id = None
219 self.__clear()
897c8064
LS
220 if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
221 self.__parse_update(msg.result, OVSDB_UPDATE2)
222 else:
223 assert self.state == self.IDL_S_MONITOR_REQUESTED
224 self.__parse_update(msg.result, OVSDB_UPDATE)
225
3ab76c56 226 except error.Error as e:
3a656eaf 227 vlog.err("%s: parse error in received schema: %s"
897c8064 228 % (self._session.get_name(), e))
8cdf0349
BP
229 self.__error()
230 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
231 and self._lock_request_id is not None
232 and self._lock_request_id == msg.id):
233 # Reply to our "lock" request.
234 self.__parse_lock_reply(msg.result)
235 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
236 and msg.method == "locked"):
237 # We got our lock.
238 self.__parse_lock_notify(msg.params, True)
239 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
240 and msg.method == "stolen"):
241 # Someone else stole our lock.
242 self.__parse_lock_notify(msg.params, False)
243 elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
244 # Reply to our echo request. Ignore it.
245 pass
897c8064
LS
246 elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
247 self.state == self.IDL_S_MONITOR_COND_REQUESTED and
248 self._monitor_request_id == msg.id):
249 if msg.error == "unknown method":
250 self.__send_monitor_request()
8cdf0349
BP
251 elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
252 ovs.jsonrpc.Message.T_REPLY)
253 and self.__txn_process_reply(msg)):
254 # __txn_process_reply() did everything needed.
255 pass
256 else:
257 # This can happen if a transaction is destroyed before we
258 # receive the reply, so keep the log level low.
3a656eaf
EJ
259 vlog.dbg("%s: received unexpected %s message"
260 % (self._session.get_name(),
261 ovs.jsonrpc.Message.type_to_string(msg.type)))
8cdf0349 262
99155935
BP
263 return initial_change_seqno != self.change_seqno
264
16ebb90e
LS
265 def send_cond_change(self):
266 if not self._session.is_connected():
267 return
268
269 for table in six.itervalues(self.tables):
270 if table.cond_changed:
271 self.__send_cond_change(table, table.condition)
272 table.cond_changed = False
273
0164e367
BP
274 def cond_change(self, table_name, cond):
275 """Sets the condition for 'table_name' to 'cond', which should be a
276 conditional expression suitable for use directly in the OVSDB
277 protocol, with the exception that the empty condition []
278 matches no rows (instead of matching every row). That is, []
279 is equivalent to [False], not to [True].
280 """
16ebb90e 281
897c8064
LS
282 table = self.tables.get(table_name)
283 if not table:
284 raise error.Error('Unknown table "%s"' % table_name)
16ebb90e 285
0164e367
BP
286 if cond == []:
287 cond = [False]
288 if table.condition != cond:
289 table.condition = cond
290 table.cond_changed = True
897c8064 291
99155935
BP
292 def wait(self, poller):
293 """Arranges for poller.block() to wake up when self.run() has something
294 to do or when activity occurs on a transaction on 'self'."""
8cdf0349
BP
295 self._session.wait(poller)
296 self._session.recv_wait(poller)
99155935 297
8cdf0349
BP
298 def has_ever_connected(self):
299 """Returns True, if the IDL successfully connected to the remote
300 database and retrieved its contents (even if the connection
301 subsequently dropped and is in the process of reconnecting). If so,
302 then the IDL contains an atomic snapshot of the database's contents
303 (but it might be arbitrarily old if the connection dropped).
304
305 Returns False if the IDL has never connected or retrieved the
306 database's contents. If so, the IDL is empty."""
307 return self.change_seqno != 0
308
309 def force_reconnect(self):
310 """Forces the IDL to drop its connection to the database and reconnect.
311 In the meantime, the contents of the IDL will not change."""
312 self._session.force_reconnect()
313
314 def set_lock(self, lock_name):
315 """If 'lock_name' is not None, configures the IDL to obtain the named
316 lock from the database server and to avoid modifying the database when
317 the lock cannot be acquired (that is, when another client has the same
318 lock).
319
320 If 'lock_name' is None, drops the locking requirement and releases the
321 lock."""
322 assert not self.txn
323 assert not self._outstanding_txns
324
325 if self.lock_name and (not lock_name or lock_name != self.lock_name):
326 # Release previous lock.
327 self.__send_unlock_request()
328 self.lock_name = None
329 self.is_lock_contended = False
330
331 if lock_name and not self.lock_name:
332 # Acquire new lock.
333 self.lock_name = lock_name
334 self.__send_lock_request()
335
d7d417fc
TW
336 def notify(self, event, row, updates=None):
337 """Hook for implementing create/update/delete notifications
338
339 :param event: The event that was triggered
340 :type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE
341 :param row: The row as it is after the operation has occured
342 :type row: Row
a7261bf7
NS
343 :param updates: For updates, row with only old values of the changed
344 columns
d7d417fc
TW
345 :type updates: Row
346 """
347
897c8064
LS
348 def __send_cond_change(self, table, cond):
349 monitor_cond_change = {table.name: [{"where": cond}]}
350 old_uuid = str(self.uuid)
351 self.uuid = uuid.uuid1()
352 params = [old_uuid, str(self.uuid), monitor_cond_change]
353 msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
354 self._session.send(msg)
355
8cdf0349
BP
356 def __clear(self):
357 changed = False
358
cb96c1b2 359 for table in six.itervalues(self.tables):
8cdf0349
BP
360 if table.rows:
361 changed = True
362 table.rows = {}
99155935 363
8cdf0349
BP
364 if changed:
365 self.change_seqno += 1
366
367 def __update_has_lock(self, new_has_lock):
368 if new_has_lock and not self.has_lock:
369 if self._monitor_request_id is None:
370 self.change_seqno += 1
371 else:
372 # We're waiting for a monitor reply, so don't signal that the
373 # database changed. The monitor reply will increment
374 # change_seqno anyhow.
375 pass
376 self.is_lock_contended = False
377 self.has_lock = new_has_lock
378
379 def __do_send_lock_request(self, method):
380 self.__update_has_lock(False)
381 self._lock_request_id = None
382 if self._session.is_connected():
383 msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
384 msg_id = msg.id
385 self._session.send(msg)
386 else:
387 msg_id = None
388 return msg_id
389
390 def __send_lock_request(self):
391 self._lock_request_id = self.__do_send_lock_request("lock")
392
393 def __send_unlock_request(self):
394 self.__do_send_lock_request("unlock")
395
396 def __parse_lock_reply(self, result):
397 self._lock_request_id = None
da2d45c6 398 got_lock = isinstance(result, dict) and result.get("locked") is True
8cdf0349
BP
399 self.__update_has_lock(got_lock)
400 if not got_lock:
401 self.is_lock_contended = True
402
403 def __parse_lock_notify(self, params, new_has_lock):
404 if (self.lock_name is not None
da2d45c6 405 and isinstance(params, (list, tuple))
8cdf0349
BP
406 and params
407 and params[0] == self.lock_name):
e9ad9219 408 self.__update_has_lock(new_has_lock)
8cdf0349
BP
409 if not new_has_lock:
410 self.is_lock_contended = True
99155935
BP
411
412 def __send_monitor_request(self):
897c8064
LS
413 if self.state == self.IDL_S_INITIAL:
414 self.state = self.IDL_S_MONITOR_COND_REQUESTED
415 method = "monitor_cond"
416 else:
417 self.state = self.IDL_S_MONITOR_REQUESTED
418 method = "monitor"
419
99155935 420 monitor_requests = {}
cb96c1b2 421 for table in six.itervalues(self.tables):
80c12152 422 columns = []
cb96c1b2 423 for column in six.iterkeys(table.columns):
80c12152 424 if ((table.name not in self.readonly) or
897c8064
LS
425 (table.name in self.readonly) and
426 (column not in self.readonly[table.name])):
80c12152 427 columns.append(column)
62bba609 428 monitor_request = {"columns": columns}
0164e367 429 if method == "monitor_cond" and table.condition != [True]:
62bba609 430 monitor_request["where"] = table.condition
16ebb90e 431 table.cond_change = False
62bba609 432 monitor_requests[table.name] = [monitor_request]
897c8064 433
99155935 434 msg = ovs.jsonrpc.Message.create_request(
897c8064 435 method, [self._db.name, str(self.uuid), monitor_requests])
8cdf0349
BP
436 self._monitor_request_id = msg.id
437 self._session.send(msg)
99155935 438
897c8064 439 def __parse_update(self, update, version):
99155935 440 try:
897c8064 441 self.__do_parse_update(update, version)
3ab76c56 442 except error.Error as e:
3a656eaf
EJ
443 vlog.err("%s: error parsing update: %s"
444 % (self._session.get_name(), e))
99155935 445
897c8064 446 def __do_parse_update(self, table_updates, version):
da2d45c6 447 if not isinstance(table_updates, dict):
99155935
BP
448 raise error.Error("<table-updates> is not an object",
449 table_updates)
450
cb96c1b2 451 for table_name, table_update in six.iteritems(table_updates):
8cdf0349 452 table = self.tables.get(table_name)
99155935 453 if not table:
f2d8ad13
BP
454 raise error.Error('<table-updates> includes unknown '
455 'table "%s"' % table_name)
99155935 456
da2d45c6 457 if not isinstance(table_update, dict):
f2d8ad13
BP
458 raise error.Error('<table-update> for table "%s" is not '
459 'an object' % table_name, table_update)
99155935 460
cb96c1b2 461 for uuid_string, row_update in six.iteritems(table_update):
49c541dc 462 if not ovs.ovsuuid.is_valid_string(uuid_string):
f2d8ad13
BP
463 raise error.Error('<table-update> for table "%s" '
464 'contains bad UUID "%s" as member '
465 'name' % (table_name, uuid_string),
99155935 466 table_update)
49c541dc 467 uuid = ovs.ovsuuid.from_string(uuid_string)
99155935 468
da2d45c6 469 if not isinstance(row_update, dict):
f2d8ad13
BP
470 raise error.Error('<table-update> for table "%s" '
471 'contains <row-update> for %s that '
472 'is not an object'
99155935
BP
473 % (table_name, uuid_string))
474
897c8064
LS
475 if version == OVSDB_UPDATE2:
476 if self.__process_update2(table, uuid, row_update):
477 self.change_seqno += 1
478 continue
479
af1eba26 480 parser = ovs.db.parser.Parser(row_update, "row-update")
4c0f6271
BP
481 old = parser.get_optional("old", [dict])
482 new = parser.get_optional("new", [dict])
483 parser.finish()
99155935 484
99155935 485 if not old and not new:
f2d8ad13
BP
486 raise error.Error('<row-update> missing "old" and '
487 '"new" members', row_update)
99155935 488
8cdf0349 489 if self.__process_update(table, uuid, old, new):
99155935
BP
490 self.change_seqno += 1
491
897c8064
LS
492 def __process_update2(self, table, uuid, row_update):
493 row = table.rows.get(uuid)
494 changed = False
495 if "delete" in row_update:
496 if row:
497 del table.rows[uuid]
498 self.notify(ROW_DELETE, row)
499 changed = True
500 else:
501 # XXX rate-limit
502 vlog.warn("cannot delete missing row %s from table"
503 "%s" % (uuid, table.name))
504 elif "insert" in row_update or "initial" in row_update:
505 if row:
506 vlog.warn("cannot add existing row %s from table"
507 " %s" % (uuid, table.name))
508 del table.rows[uuid]
509 row = self.__create_row(table, uuid)
510 if "insert" in row_update:
511 row_update = row_update['insert']
512 else:
513 row_update = row_update['initial']
514 self.__add_default(table, row_update)
515 if self.__row_update(table, row, row_update):
516 changed = True
517 self.notify(ROW_CREATE, row)
518 elif "modify" in row_update:
519 if not row:
520 raise error.Error('Modify non-existing row')
521
eab13876
DA
522 old_row = self.__apply_diff(table, row, row_update['modify'])
523 self.notify(ROW_UPDATE, row, Row(self, table, uuid, old_row))
897c8064
LS
524 changed = True
525 else:
526 raise error.Error('<row-update> unknown operation',
527 row_update)
528 return changed
529
8cdf0349 530 def __process_update(self, table, uuid, old, new):
99155935 531 """Returns True if a column changed, False otherwise."""
8cdf0349 532 row = table.rows.get(uuid)
7d48f8f8 533 changed = False
99155935
BP
534 if not new:
535 # Delete row.
536 if row:
8cdf0349 537 del table.rows[uuid]
7d48f8f8 538 changed = True
d7d417fc 539 self.notify(ROW_DELETE, row)
99155935
BP
540 else:
541 # XXX rate-limit
3a656eaf
EJ
542 vlog.warn("cannot delete missing row %s from table %s"
543 % (uuid, table.name))
99155935
BP
544 elif not old:
545 # Insert row.
546 if not row:
547 row = self.__create_row(table, uuid)
7d48f8f8 548 changed = True
99155935
BP
549 else:
550 # XXX rate-limit
3a656eaf
EJ
551 vlog.warn("cannot add existing row %s to table %s"
552 % (uuid, table.name))
8cdf0349 553 if self.__row_update(table, row, new):
7d48f8f8 554 changed = True
d7d417fc 555 self.notify(ROW_CREATE, row)
99155935 556 else:
d7d417fc 557 op = ROW_UPDATE
99155935
BP
558 if not row:
559 row = self.__create_row(table, uuid)
7d48f8f8 560 changed = True
d7d417fc 561 op = ROW_CREATE
99155935 562 # XXX rate-limit
3a656eaf
EJ
563 vlog.warn("cannot modify missing row %s in table %s"
564 % (uuid, table.name))
8cdf0349 565 if self.__row_update(table, row, new):
7d48f8f8 566 changed = True
d7d417fc 567 self.notify(op, row, Row.from_json(self, table, uuid, old))
7d48f8f8 568 return changed
99155935 569
897c8064
LS
570 def __column_name(self, column):
571 if column.type.key.type == ovs.db.types.UuidType:
572 return ovs.ovsuuid.to_json(column.type.key.type.default)
573 else:
574 return column.type.key.type.default
575
576 def __add_default(self, table, row_update):
577 for column in six.itervalues(table.columns):
578 if column.name not in row_update:
579 if ((table.name not in self.readonly) or
580 (table.name in self.readonly) and
581 (column.name not in self.readonly[table.name])):
582 if column.type.n_min != 0 and not column.type.is_map():
583 row_update[column.name] = self.__column_name(column)
584
585 def __apply_diff(self, table, row, row_diff):
eab13876 586 old_row = {}
a7261bf7 587 for column_name, datum_diff_json in six.iteritems(row_diff):
897c8064
LS
588 column = table.columns.get(column_name)
589 if not column:
590 # XXX rate-limit
591 vlog.warn("unknown column %s updating table %s"
592 % (column_name, table.name))
593 continue
594
595 try:
a59912a0 596 datum_diff = data.Datum.from_json(column.type, datum_diff_json)
897c8064
LS
597 except error.Error as e:
598 # XXX rate-limit
599 vlog.warn("error parsing column %s in table %s: %s"
600 % (column_name, table.name, e))
601 continue
602
eab13876 603 old_row[column_name] = row._data[column_name].copy()
a7261bf7 604 datum = row._data[column_name].diff(datum_diff)
897c8064
LS
605 if datum != row._data[column_name]:
606 row._data[column_name] = datum
607
eab13876 608 return old_row
a7261bf7 609
8cdf0349 610 def __row_update(self, table, row, row_json):
99155935 611 changed = False
cb96c1b2 612 for column_name, datum_json in six.iteritems(row_json):
99155935
BP
613 column = table.columns.get(column_name)
614 if not column:
615 # XXX rate-limit
3a656eaf
EJ
616 vlog.warn("unknown column %s updating table %s"
617 % (column_name, table.name))
99155935
BP
618 continue
619
620 try:
a59912a0 621 datum = data.Datum.from_json(column.type, datum_json)
3ab76c56 622 except error.Error as e:
99155935 623 # XXX rate-limit
3a656eaf
EJ
624 vlog.warn("error parsing column %s in table %s: %s"
625 % (column_name, table.name, e))
99155935
BP
626 continue
627
8cdf0349
BP
628 if datum != row._data[column_name]:
629 row._data[column_name] = datum
630 if column.alert:
631 changed = True
99155935
BP
632 else:
633 # Didn't really change but the OVSDB monitor protocol always
634 # includes every value in a row.
635 pass
636 return changed
637
99155935 638 def __create_row(self, table, uuid):
8cdf0349 639 data = {}
cb96c1b2 640 for column in six.itervalues(table.columns):
8cdf0349
BP
641 data[column.name] = ovs.db.data.Datum.default(column.type)
642 row = table.rows[uuid] = Row(self, table, uuid, data)
99155935
BP
643 return row
644
8cdf0349
BP
645 def __error(self):
646 self._session.force_reconnect()
647
648 def __txn_abort_all(self):
649 while self._outstanding_txns:
650 txn = self._outstanding_txns.popitem()[1]
854a94d9 651 txn._status = Transaction.TRY_AGAIN
8cdf0349
BP
652
653 def __txn_process_reply(self, msg):
654 txn = self._outstanding_txns.pop(msg.id, None)
655 if txn:
656 txn._process_reply(msg)
beba3d82 657 return True
8cdf0349 658
26bb0f31 659
8cdf0349
BP
660def _uuid_to_row(atom, base):
661 if base.ref_table:
662 return base.ref_table.rows.get(atom)
663 else:
664 return atom
665
26bb0f31 666
8cdf0349 667def _row_to_uuid(value):
da2d45c6 668 if isinstance(value, Row):
8cdf0349
BP
669 return value.uuid
670 else:
671 return value
bf6ec045 672
26bb0f31 673
fbafc3c2 674@functools.total_ordering
bf6ec045 675class Row(object):
8cdf0349
BP
676 """A row within an IDL.
677
678 The client may access the following attributes directly:
679
680 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
681
682 - An attribute for each column in the Row's table, named for the column,
683 whose values are as returned by Datum.to_python() for the column's type.
684
685 If some error occurs (e.g. the database server's idea of the column is
686 different from the IDL's idea), then the attribute values is the
687 "default" value return by Datum.default() for the column's type. (It is
688 important to know this because the default value may violate constraints
689 for the column's type, e.g. the default integer value is 0 even if column
690 contraints require the column's value to be positive.)
691
692 When a transaction is active, column attributes may also be assigned new
693 values. Committing the transaction will then cause the new value to be
694 stored into the database.
695
696 *NOTE*: In the current implementation, the value of a column is a *copy*
697 of the value in the database. This means that modifying its value
698 directly will have no useful effect. For example, the following:
699 row.mycolumn["a"] = "b" # don't do this
700 will not change anything in the database, even after commit. To modify
701 the column, instead assign the modified column value back to the column:
702 d = row.mycolumn
703 d["a"] = "b"
704 row.mycolumn = d
705"""
706 def __init__(self, idl, table, uuid, data):
707 # All of the explicit references to self.__dict__ below are required
708 # to set real attributes with invoking self.__getattr__().
709 self.__dict__["uuid"] = uuid
710
711 self.__dict__["_idl"] = idl
712 self.__dict__["_table"] = table
713
714 # _data is the committed data. It takes the following values:
715 #
716 # - A dictionary that maps every column name to a Datum, if the row
717 # exists in the committed form of the database.
718 #
719 # - None, if this row is newly inserted within the active transaction
720 # and thus has no committed form.
721 self.__dict__["_data"] = data
722
723 # _changes describes changes to this row within the active transaction.
724 # It takes the following values:
725 #
726 # - {}, the empty dictionary, if no transaction is active or if the
727 # row has yet not been changed within this transaction.
728 #
729 # - A dictionary that maps a column name to its new Datum, if an
730 # active transaction changes those columns' values.
731 #
732 # - A dictionary that maps every column name to a Datum, if the row
733 # is newly inserted within the active transaction.
734 #
735 # - None, if this transaction deletes this row.
736 self.__dict__["_changes"] = {}
737
a59912a0
RM
738 # _mutations describes changes to this row to be handled via a
739 # mutate operation on the wire. It takes the following values:
740 #
741 # - {}, the empty dictionary, if no transaction is active or if the
742 # row has yet not been mutated within this transaction.
743 #
744 # - A dictionary that contains two keys:
745 #
746 # - "_inserts" contains a dictionary that maps column names to
747 # new keys/key-value pairs that should be inserted into the
748 # column
749 # - "_removes" contains a dictionary that maps column names to
750 # the keys/key-value pairs that should be removed from the
751 # column
752 #
753 # - None, if this transaction deletes this row.
754 self.__dict__["_mutations"] = {}
755
8cdf0349
BP
756 # A dictionary whose keys are the names of columns that must be
757 # verified as prerequisites when the transaction commits. The values
758 # in the dictionary are all None.
759 self.__dict__["_prereqs"] = {}
760
fbafc3c2
RB
761 def __lt__(self, other):
762 if not isinstance(other, Row):
763 return NotImplemented
764 return bool(self.__dict__['uuid'] < other.__dict__['uuid'])
765
766 def __eq__(self, other):
767 if not isinstance(other, Row):
768 return NotImplemented
769 return bool(self.__dict__['uuid'] == other.__dict__['uuid'])
770
771 def __hash__(self):
772 return int(self.__dict__['uuid'])
773
8cdf0349
BP
774 def __getattr__(self, column_name):
775 assert self._changes is not None
a59912a0 776 assert self._mutations is not None
8cdf0349 777
685e6983
TR
778 try:
779 column = self._table.columns[column_name]
780 except KeyError:
781 raise AttributeError("%s instance has no attribute '%s'" %
782 (self.__class__.__name__, column_name))
8cdf0349 783 datum = self._changes.get(column_name)
a59912a0
RM
784 inserts = None
785 if '_inserts' in self._mutations.keys():
786 inserts = self._mutations['_inserts'].get(column_name)
787 removes = None
788 if '_removes' in self._mutations.keys():
789 removes = self._mutations['_removes'].get(column_name)
8cdf0349 790 if datum is None:
3b4c362f 791 if self._data is None:
a59912a0
RM
792 if inserts is None:
793 raise AttributeError("%s instance has no attribute '%s'" %
794 (self.__class__.__name__,
795 column_name))
796 else:
2d54d801
AB
797 datum = data.Datum.from_python(column.type,
798 inserts,
799 _row_to_uuid)
800 elif column_name in self._data:
80c12152 801 datum = self._data[column_name]
2d54d801
AB
802 if column.type.is_set():
803 dlist = datum.as_list()
a59912a0 804 if inserts is not None:
2d54d801 805 dlist.extend(list(inserts))
a59912a0 806 if removes is not None:
2d54d801
AB
807 removes_datum = data.Datum.from_python(column.type,
808 removes,
809 _row_to_uuid)
810 removes_list = removes_datum.as_list()
811 dlist = [x for x in dlist if x not in removes_list]
812 datum = data.Datum.from_python(column.type, dlist,
813 _row_to_uuid)
814 elif column.type.is_map():
815 dmap = datum.to_python(_uuid_to_row)
a59912a0 816 if inserts is not None:
2d54d801 817 dmap.update(inserts)
a59912a0 818 if removes is not None:
2d54d801
AB
819 for key in removes:
820 if key not in (inserts or {}):
821 del dmap[key]
822 datum = data.Datum.from_python(column.type, dmap,
823 _row_to_uuid)
80c12152 824 else:
a59912a0
RM
825 if inserts is None:
826 raise AttributeError("%s instance has no attribute '%s'" %
827 (self.__class__.__name__,
828 column_name))
829 else:
830 datum = inserts
8cdf0349
BP
831
832 return datum.to_python(_uuid_to_row)
833
834 def __setattr__(self, column_name, value):
835 assert self._changes is not None
836 assert self._idl.txn
837
80c12152 838 if ((self._table.name in self._idl.readonly) and
897c8064 839 (column_name in self._idl.readonly[self._table.name])):
37520ab3
RB
840 vlog.warn("attempting to write to readonly column %s"
841 % column_name)
80c12152
SA
842 return
843
8cdf0349
BP
844 column = self._table.columns[column_name]
845 try:
a59912a0 846 datum = data.Datum.from_python(column.type, value, _row_to_uuid)
3ab76c56 847 except error.Error as e:
8cdf0349 848 # XXX rate-limit
3a656eaf
EJ
849 vlog.err("attempting to write bad value to column %s (%s)"
850 % (column_name, e))
8cdf0349
BP
851 return
852 self._idl.txn._write(self, column, datum)
853
a59912a0
RM
854 def addvalue(self, column_name, key):
855 self._idl.txn._txn_rows[self.uuid] = self
330b9c9c
AB
856 column = self._table.columns[column_name]
857 try:
858 data.Datum.from_python(column.type, key, _row_to_uuid)
859 except error.Error as e:
860 # XXX rate-limit
861 vlog.err("attempting to write bad value to column %s (%s)"
862 % (column_name, e))
863 return
864 inserts = self._mutations.setdefault('_inserts', {})
865 column_value = inserts.setdefault(column_name, set())
866 column_value.add(key)
a59912a0
RM
867
868 def delvalue(self, column_name, key):
869 self._idl.txn._txn_rows[self.uuid] = self
330b9c9c
AB
870 column = self._table.columns[column_name]
871 try:
872 data.Datum.from_python(column.type, key, _row_to_uuid)
873 except error.Error as e:
874 # XXX rate-limit
875 vlog.err("attempting to delete bad value from column %s (%s)"
876 % (column_name, e))
877 return
878 removes = self._mutations.setdefault('_removes', {})
879 column_value = removes.setdefault(column_name, set())
880 column_value.add(key)
a59912a0
RM
881
882 def setkey(self, column_name, key, value):
883 self._idl.txn._txn_rows[self.uuid] = self
884 column = self._table.columns[column_name]
885 try:
330b9c9c 886 data.Datum.from_python(column.type, {key: value}, _row_to_uuid)
a59912a0
RM
887 except error.Error as e:
888 # XXX rate-limit
889 vlog.err("attempting to write bad value to column %s (%s)"
890 % (column_name, e))
891 return
2d54d801 892 if self._data and column_name in self._data:
330b9c9c
AB
893 # Remove existing key/value before updating.
894 removes = self._mutations.setdefault('_removes', {})
895 column_value = removes.setdefault(column_name, set())
896 column_value.add(key)
897 inserts = self._mutations.setdefault('_inserts', {})
898 column_value = inserts.setdefault(column_name, {})
899 column_value[key] = value
900
901 def delkey(self, column_name, key, value=None):
a59912a0 902 self._idl.txn._txn_rows[self.uuid] = self
330b9c9c
AB
903 if value:
904 try:
905 old_value = data.Datum.to_python(self._data[column_name],
906 _uuid_to_row)
907 except error.Error:
908 return
909 if key not in old_value:
910 return
911 if old_value[key] != value:
912 return
913 removes = self._mutations.setdefault('_removes', {})
914 column_value = removes.setdefault(column_name, set())
915 column_value.add(key)
a59912a0
RM
916 return
917
d7d417fc
TW
918 @classmethod
919 def from_json(cls, idl, table, uuid, row_json):
920 data = {}
cb96c1b2 921 for column_name, datum_json in six.iteritems(row_json):
d7d417fc
TW
922 column = table.columns.get(column_name)
923 if not column:
924 # XXX rate-limit
925 vlog.warn("unknown column %s in table %s"
926 % (column_name, table.name))
927 continue
928 try:
929 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
3ab76c56 930 except error.Error as e:
d7d417fc
TW
931 # XXX rate-limit
932 vlog.warn("error parsing column %s in table %s: %s"
933 % (column_name, table.name, e))
934 continue
935 data[column_name] = datum
936 return cls(idl, table, uuid, data)
937
8cdf0349
BP
938 def verify(self, column_name):
939 """Causes the original contents of column 'column_name' in this row to
940 be verified as a prerequisite to completing the transaction. That is,
941 if 'column_name' changed in this row (or if this row was deleted)
942 between the time that the IDL originally read its contents and the time
943 that the transaction commits, then the transaction aborts and
854a94d9 944 Transaction.commit() returns Transaction.TRY_AGAIN.
8cdf0349
BP
945
946 The intention is that, to ensure that no transaction commits based on
947 dirty reads, an application should call Row.verify() on each data item
948 read as part of a read-modify-write operation.
949
950 In some cases Row.verify() reduces to a no-op, because the current
951 value of the column is already known:
952
953 - If this row is a row created by the current transaction (returned
954 by Transaction.insert()).
955
956 - If the column has already been modified within the current
957 transaction.
958
959 Because of the latter property, always call Row.verify() *before*
960 modifying the column, for a given read-modify-write.
961
962 A transaction must be in progress."""
963 assert self._idl.txn
964 assert self._changes is not None
965 if not self._data or column_name in self._changes:
966 return
967
968 self._prereqs[column_name] = None
969
970 def delete(self):
971 """Deletes this row from its table.
972
973 A transaction must be in progress."""
974 assert self._idl.txn
975 assert self._changes is not None
976 if self._data is None:
977 del self._idl.txn._txn_rows[self.uuid]
8d3efc1c
BP
978 else:
979 self._idl.txn._txn_rows[self.uuid] = self
8cdf0349
BP
980 self.__dict__["_changes"] = None
981 del self._table.rows[self.uuid]
982
80c12152
SA
983 def fetch(self, column_name):
984 self._idl.txn._fetch(self, column_name)
985
94fbe1aa
BP
986 def increment(self, column_name):
987 """Causes the transaction, when committed, to increment the value of
988 'column_name' within this row by 1. 'column_name' must have an integer
989 type. After the transaction commits successfully, the client may
990 retrieve the final (incremented) value of 'column_name' with
991 Transaction.get_increment_new_value().
992
993 The client could accomplish something similar by reading and writing
994 and verify()ing columns. However, increment() will never (by itself)
995 cause a transaction to fail because of a verify error.
996
997 The intended use is for incrementing the "next_cfg" column in
998 the Open_vSwitch table."""
999 self._idl.txn._increment(self, column_name)
1000
26bb0f31 1001
8cdf0349
BP
1002def _uuid_name_from_uuid(uuid):
1003 return "row%s" % str(uuid).replace("-", "_")
1004
26bb0f31 1005
8cdf0349
BP
1006def _where_uuid_equals(uuid):
1007 return [["_uuid", "==", ["uuid", str(uuid)]]]
1008
26bb0f31 1009
8cdf0349
BP
1010class _InsertedRow(object):
1011 def __init__(self, op_index):
1012 self.op_index = op_index
1013 self.real = None
1014
26bb0f31 1015
8cdf0349 1016class Transaction(object):
2f926787
BP
1017 """A transaction may modify the contents of a database by modifying the
1018 values of columns, deleting rows, inserting rows, or adding checks that
1019 columns in the database have not changed ("verify" operations), through
1020 Row methods.
1021
1022 Reading and writing columns and inserting and deleting rows are all
1023 straightforward. The reasons to verify columns are less obvious.
1024 Verification is the key to maintaining transactional integrity. Because
1025 OVSDB handles multiple clients, it can happen that between the time that
1026 OVSDB client A reads a column and writes a new value, OVSDB client B has
1027 written that column. Client A's write should not ordinarily overwrite
1028 client B's, especially if the column in question is a "map" column that
1029 contains several more or less independent data items. If client A adds a
1030 "verify" operation before it writes the column, then the transaction fails
1031 in case client B modifies it first. Client A will then see the new value
1032 of the column and compose a new transaction based on the new contents
1033 written by client B.
1034
1035 When a transaction is complete, which must be before the next call to
1036 Idl.run(), call Transaction.commit() or Transaction.abort().
1037
1038 The life-cycle of a transaction looks like this:
1039
1040 1. Create the transaction and record the initial sequence number:
1041
1042 seqno = idl.change_seqno(idl)
1043 txn = Transaction(idl)
1044
1045 2. Modify the database with Row and Transaction methods.
1046
1047 3. Commit the transaction by calling Transaction.commit(). The first call
1048 to this function probably returns Transaction.INCOMPLETE. The client
1049 must keep calling again along as this remains true, calling Idl.run() in
1050 between to let the IDL do protocol processing. (If the client doesn't
1051 have anything else to do in the meantime, it can use
1052 Transaction.commit_block() to avoid having to loop itself.)
1053
1054 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
1055 to change from the saved 'seqno' (it's possible that it's already
1056 changed, in which case the client should not wait at all), then start
1057 over from step 1. Only a call to Idl.run() will change the return value
1058 of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)"""
1059
8cdf0349 1060 # Status values that Transaction.commit() can return.
eda26d40
RB
1061
1062 # Not yet committed or aborted.
1063 UNCOMMITTED = "uncommitted"
1064 # Transaction didn't include any changes.
1065 UNCHANGED = "unchanged"
1066 # Commit in progress, please wait.
1067 INCOMPLETE = "incomplete"
1068 # ovsdb_idl_txn_abort() called.
1069 ABORTED = "aborted"
1070 # Commit successful.
1071 SUCCESS = "success"
1072 # Commit failed because a "verify" operation
1073 # reported an inconsistency, due to a network
1074 # problem, or other transient failure. Wait
1075 # for a change, then try again.
1076 TRY_AGAIN = "try again"
1077 # Server hasn't given us the lock yet.
1078 NOT_LOCKED = "not locked"
1079 # Commit failed due to a hard error.
1080 ERROR = "error"
8cdf0349
BP
1081
1082 @staticmethod
1083 def status_to_string(status):
1084 """Converts one of the status values that Transaction.commit() can
1085 return into a human-readable string.
1086
1087 (The status values are in fact such strings already, so
1088 there's nothing to do.)"""
1089 return status
1090
1091 def __init__(self, idl):
1092 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
1093 A given Idl may only have a single active transaction at a time.
1094
1095 A Transaction may modify the contents of a database by assigning new
1096 values to columns (attributes of Row), deleting rows (with
1097 Row.delete()), or inserting rows (with Transaction.insert()). It may
1098 also check that columns in the database have not changed with
1099 Row.verify().
1100
1101 When a transaction is complete (which must be before the next call to
1102 Idl.run()), call Transaction.commit() or Transaction.abort()."""
1103 assert idl.txn is None
1104
1105 idl.txn = self
1106 self._request_id = None
1107 self.idl = idl
1108 self.dry_run = False
1109 self._txn_rows = {}
1110 self._status = Transaction.UNCOMMITTED
1111 self._error = None
1112 self._comments = []
1113
94fbe1aa 1114 self._inc_row = None
8cdf0349 1115 self._inc_column = None
8cdf0349 1116
80c12152
SA
1117 self._fetch_requests = []
1118
26bb0f31 1119 self._inserted_rows = {} # Map from UUID to _InsertedRow
8cdf0349
BP
1120
1121 def add_comment(self, comment):
80c12152 1122 """Appends 'comment' to the comments that will be passed to the OVSDB
8cdf0349
BP
1123 server when this transaction is committed. (The comment will be
1124 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
1125 relatively human-readable form.)"""
1126 self._comments.append(comment)
1127
8cdf0349 1128 def wait(self, poller):
2f926787
BP
1129 """Causes poll_block() to wake up if this transaction has completed
1130 committing."""
8cdf0349
BP
1131 if self._status not in (Transaction.UNCOMMITTED,
1132 Transaction.INCOMPLETE):
1133 poller.immediate_wake()
1134
1135 def _substitute_uuids(self, json):
da2d45c6 1136 if isinstance(json, (list, tuple)):
8cdf0349 1137 if (len(json) == 2
897c8064
LS
1138 and json[0] == 'uuid'
1139 and ovs.ovsuuid.is_valid_string(json[1])):
8cdf0349
BP
1140 uuid = ovs.ovsuuid.from_string(json[1])
1141 row = self._txn_rows.get(uuid, None)
1142 if row and row._data is None:
1143 return ["named-uuid", _uuid_name_from_uuid(uuid)]
225b582a
IY
1144 else:
1145 return [self._substitute_uuids(elem) for elem in json]
8cdf0349
BP
1146 return json
1147
1148 def __disassemble(self):
1149 self.idl.txn = None
1150
cb96c1b2 1151 for row in six.itervalues(self._txn_rows):
8cdf0349
BP
1152 if row._changes is None:
1153 row._table.rows[row.uuid] = row
1154 elif row._data is None:
1155 del row._table.rows[row.uuid]
1156 row.__dict__["_changes"] = {}
a59912a0 1157 row.__dict__["_mutations"] = {}
8cdf0349
BP
1158 row.__dict__["_prereqs"] = {}
1159 self._txn_rows = {}
1160
1161 def commit(self):
2f926787
BP
1162 """Attempts to commit 'txn'. Returns the status of the commit
1163 operation, one of the following constants:
1164
1165 Transaction.INCOMPLETE:
1166
1167 The transaction is in progress, but not yet complete. The caller
1168 should call again later, after calling Idl.run() to let the
1169 IDL do OVSDB protocol processing.
1170
1171 Transaction.UNCHANGED:
1172
1173 The transaction is complete. (It didn't actually change the
1174 database, so the IDL didn't send any request to the database
1175 server.)
1176
1177 Transaction.ABORTED:
1178
1179 The caller previously called Transaction.abort().
1180
1181 Transaction.SUCCESS:
1182
1183 The transaction was successful. The update made by the
1184 transaction (and possibly other changes made by other database
1185 clients) should already be visible in the IDL.
1186
1187 Transaction.TRY_AGAIN:
1188
1189 The transaction failed for some transient reason, e.g. because a
1190 "verify" operation reported an inconsistency or due to a network
1191 problem. The caller should wait for a change to the database,
1192 then compose a new transaction, and commit the new transaction.
1193
1194 Use Idl.change_seqno to wait for a change in the database. It is
1195 important to use its value *before* the initial call to
1196 Transaction.commit() as the baseline for this purpose, because
1197 the change that one should wait for can happen after the initial
1198 call but before the call that returns Transaction.TRY_AGAIN, and
1199 using some other baseline value in that situation could cause an
1200 indefinite wait if the database rarely changes.
1201
1202 Transaction.NOT_LOCKED:
1203
1204 The transaction failed because the IDL has been configured to
1205 require a database lock (with Idl.set_lock()) but didn't
1206 get it yet or has already lost it.
8cdf0349
BP
1207
1208 Committing a transaction rolls back all of the changes that it made to
2f926787 1209 the IDL's copy of the database. If the transaction commits
8cdf0349 1210 successfully, then the database server will send an update and, thus,
2f926787 1211 the IDL will be updated with the committed changes."""
8cdf0349
BP
1212 # The status can only change if we're the active transaction.
1213 # (Otherwise, our status will change only in Idl.run().)
1214 if self != self.idl.txn:
1215 return self._status
1216
1217 # If we need a lock but don't have it, give up quickly.
9614403d 1218 if self.idl.lock_name and not self.idl.has_lock:
8cdf0349
BP
1219 self._status = Transaction.NOT_LOCKED
1220 self.__disassemble()
1221 return self._status
1222
1223 operations = [self.idl._db.name]
1224
1225 # Assert that we have the required lock (avoiding a race).
1226 if self.idl.lock_name:
1227 operations.append({"op": "assert",
1228 "lock": self.idl.lock_name})
1229
1230 # Add prerequisites and declarations of new rows.
cb96c1b2 1231 for row in six.itervalues(self._txn_rows):
8cdf0349
BP
1232 if row._prereqs:
1233 rows = {}
1234 columns = []
1235 for column_name in row._prereqs:
1236 columns.append(column_name)
1237 rows[column_name] = row._data[column_name].to_json()
1238 operations.append({"op": "wait",
1239 "table": row._table.name,
1240 "timeout": 0,
1241 "where": _where_uuid_equals(row.uuid),
1242 "until": "==",
1243 "columns": columns,
1244 "rows": [rows]})
1245
1246 # Add updates.
1247 any_updates = False
cb96c1b2 1248 for row in six.itervalues(self._txn_rows):
8cdf0349
BP
1249 if row._changes is None:
1250 if row._table.is_root:
1251 operations.append({"op": "delete",
1252 "table": row._table.name,
1253 "where": _where_uuid_equals(row.uuid)})
1254 any_updates = True
1255 else:
1256 # Let ovsdb-server decide whether to really delete it.
1257 pass
1258 elif row._changes:
1259 op = {"table": row._table.name}
1260 if row._data is None:
1261 op["op"] = "insert"
1262 op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
1263 any_updates = True
1264
1265 op_index = len(operations) - 1
1266 self._inserted_rows[row.uuid] = _InsertedRow(op_index)
1267 else:
1268 op["op"] = "update"
1269 op["where"] = _where_uuid_equals(row.uuid)
1270
1271 row_json = {}
1272 op["row"] = row_json
1273
cb96c1b2 1274 for column_name, datum in six.iteritems(row._changes):
8cdf0349 1275 if row._data is not None or not datum.is_default():
26bb0f31 1276 row_json[column_name] = (
897c8064 1277 self._substitute_uuids(datum.to_json()))
8cdf0349
BP
1278
1279 # If anything really changed, consider it an update.
1280 # We can't suppress not-really-changed values earlier
1281 # or transactions would become nonatomic (see the big
1282 # comment inside Transaction._write()).
1283 if (not any_updates and row._data is not None and
897c8064 1284 row._data[column_name] != datum):
8cdf0349
BP
1285 any_updates = True
1286
1287 if row._data is None or row_json:
1288 operations.append(op)
a59912a0
RM
1289 if row._mutations:
1290 addop = False
1291 op = {"table": row._table.name}
1292 op["op"] = "mutate"
b3220c67
AB
1293 if row._data is None:
1294 # New row
1295 op["where"] = self._substitute_uuids(
1296 _where_uuid_equals(row.uuid))
1297 else:
1298 # Existing row
1299 op["where"] = _where_uuid_equals(row.uuid)
a59912a0
RM
1300 op["mutations"] = []
1301 if '_removes' in row._mutations.keys():
1302 for col, dat in six.iteritems(row._mutations['_removes']):
1303 column = row._table.columns[col]
1304 if column.type.is_map():
1305 opdat = ["set"]
330b9c9c 1306 opdat.append(list(dat))
a59912a0
RM
1307 else:
1308 opdat = ["set"]
1309 inner_opdat = []
1310 for ele in dat:
1311 try:
1312 datum = data.Datum.from_python(column.type,
1313 ele, _row_to_uuid)
1314 except error.Error:
1315 return
1316 inner_opdat.append(
1317 self._substitute_uuids(datum.to_json()))
1318 opdat.append(inner_opdat)
1319 mutation = [col, "delete", opdat]
1320 op["mutations"].append(mutation)
1321 addop = True
1322 if '_inserts' in row._mutations.keys():
330b9c9c 1323 for col, val in six.iteritems(row._mutations['_inserts']):
a59912a0
RM
1324 column = row._table.columns[col]
1325 if column.type.is_map():
1326 opdat = ["map"]
330b9c9c
AB
1327 datum = data.Datum.from_python(column.type, val,
1328 _row_to_uuid)
1329 opdat.append(datum.as_list())
a59912a0
RM
1330 else:
1331 opdat = ["set"]
1332 inner_opdat = []
330b9c9c 1333 for ele in val:
a59912a0
RM
1334 try:
1335 datum = data.Datum.from_python(column.type,
1336 ele, _row_to_uuid)
1337 except error.Error:
1338 return
1339 inner_opdat.append(
1340 self._substitute_uuids(datum.to_json()))
1341 opdat.append(inner_opdat)
1342 mutation = [col, "insert", opdat]
1343 op["mutations"].append(mutation)
1344 addop = True
1345 if addop:
1346 operations.append(op)
1347 any_updates = True
8cdf0349 1348
80c12152
SA
1349 if self._fetch_requests:
1350 for fetch in self._fetch_requests:
1351 fetch["index"] = len(operations) - 1
1352 operations.append({"op": "select",
1353 "table": fetch["row"]._table.name,
1354 "where": self._substitute_uuids(
1355 _where_uuid_equals(fetch["row"].uuid)),
1356 "columns": [fetch["column_name"]]})
1357 any_updates = True
1358
8cdf0349 1359 # Add increment.
94fbe1aa 1360 if self._inc_row and any_updates:
8cdf0349
BP
1361 self._inc_index = len(operations) - 1
1362
1363 operations.append({"op": "mutate",
94fbe1aa 1364 "table": self._inc_row._table.name,
26bb0f31 1365 "where": self._substitute_uuids(
94fbe1aa 1366 _where_uuid_equals(self._inc_row.uuid)),
8cdf0349
BP
1367 "mutations": [[self._inc_column, "+=", 1]]})
1368 operations.append({"op": "select",
94fbe1aa 1369 "table": self._inc_row._table.name,
26bb0f31 1370 "where": self._substitute_uuids(
94fbe1aa 1371 _where_uuid_equals(self._inc_row.uuid)),
8cdf0349
BP
1372 "columns": [self._inc_column]})
1373
1374 # Add comment.
1375 if self._comments:
1376 operations.append({"op": "comment",
1377 "comment": "\n".join(self._comments)})
1378
1379 # Dry run?
1380 if self.dry_run:
1381 operations.append({"op": "abort"})
1382
1383 if not any_updates:
1384 self._status = Transaction.UNCHANGED
1385 else:
1386 msg = ovs.jsonrpc.Message.create_request("transact", operations)
1387 self._request_id = msg.id
1388 if not self.idl._session.send(msg):
1389 self.idl._outstanding_txns[self._request_id] = self
1390 self._status = Transaction.INCOMPLETE
1391 else:
854a94d9 1392 self._status = Transaction.TRY_AGAIN
8cdf0349
BP
1393
1394 self.__disassemble()
1395 return self._status
1396
1397 def commit_block(self):
2f926787
BP
1398 """Attempts to commit this transaction, blocking until the commit
1399 either succeeds or fails. Returns the final commit status, which may
1400 be any Transaction.* value other than Transaction.INCOMPLETE.
1401
1402 This function calls Idl.run() on this transaction'ss IDL, so it may
1403 cause Idl.change_seqno to change."""
8cdf0349
BP
1404 while True:
1405 status = self.commit()
1406 if status != Transaction.INCOMPLETE:
1407 return status
1408
1409 self.idl.run()
1410
1411 poller = ovs.poller.Poller()
1412 self.idl.wait(poller)
1413 self.wait(poller)
1414 poller.block()
1415
1416 def get_increment_new_value(self):
2f926787
BP
1417 """Returns the final (incremented) value of the column in this
1418 transaction that was set to be incremented by Row.increment. This
1419 transaction must have committed successfully."""
8cdf0349
BP
1420 assert self._status == Transaction.SUCCESS
1421 return self._inc_new_value
1422
1423 def abort(self):
1424 """Aborts this transaction. If Transaction.commit() has already been
1425 called then the transaction might get committed anyhow."""
1426 self.__disassemble()
1427 if self._status in (Transaction.UNCOMMITTED,
1428 Transaction.INCOMPLETE):
1429 self._status = Transaction.ABORTED
1430
1431 def get_error(self):
1432 """Returns a string representing this transaction's current status,
1433 suitable for use in log messages."""
1434 if self._status != Transaction.ERROR:
1435 return Transaction.status_to_string(self._status)
1436 elif self._error:
1437 return self._error
1438 else:
1439 return "no error details available"
1440
1441 def __set_error_json(self, json):
1442 if self._error is None:
1443 self._error = ovs.json.to_string(json)
1444
1445 def get_insert_uuid(self, uuid):
1446 """Finds and returns the permanent UUID that the database assigned to a
1447 newly inserted row, given the UUID that Transaction.insert() assigned
1448 locally to that row.
1449
1450 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1451 or if it was assigned by that function and then deleted by Row.delete()
1452 within the same transaction. (Rows that are inserted and then deleted
1453 within a single transaction are never sent to the database server, so
1454 it never assigns them a permanent UUID.)
1455
1456 This transaction must have completed successfully."""
1457 assert self._status in (Transaction.SUCCESS,
1458 Transaction.UNCHANGED)
1459 inserted_row = self._inserted_rows.get(uuid)
1460 if inserted_row:
1461 return inserted_row.real
1462 return None
1463
94fbe1aa
BP
1464 def _increment(self, row, column):
1465 assert not self._inc_row
1466 self._inc_row = row
1467 self._inc_column = column
1468
80c12152 1469 def _fetch(self, row, column_name):
a0631d92 1470 self._fetch_requests.append({"row": row, "column_name": column_name})
80c12152 1471
8cdf0349
BP
1472 def _write(self, row, column, datum):
1473 assert row._changes is not None
a59912a0 1474 assert row._mutations is not None
8cdf0349
BP
1475
1476 txn = row._idl.txn
1477
1478 # If this is a write-only column and the datum being written is the
1479 # same as the one already there, just skip the update entirely. This
1480 # is worth optimizing because we have a lot of columns that get
1481 # periodically refreshed into the database but don't actually change
1482 # that often.
1483 #
1484 # We don't do this for read/write columns because that would break
1485 # atomicity of transactions--some other client might have written a
1486 # different value in that column since we read it. (But if a whole
1487 # transaction only does writes of existing values, without making any
1488 # real changes, we will drop the whole transaction later in
1489 # ovsdb_idl_txn_commit().)
37520ab3
RB
1490 if (not column.alert and row._data and
1491 row._data.get(column.name) == datum):
8cdf0349
BP
1492 new_value = row._changes.get(column.name)
1493 if new_value is None or new_value == datum:
1494 return
1495
1496 txn._txn_rows[row.uuid] = row
330b9c9c
AB
1497 if '_inserts' in row._mutations:
1498 row._mutations['_inserts'].pop(column.name, None)
1499 if '_removes' in row._mutations:
1500 row._mutations['_removes'].pop(column.name, None)
1501 row._changes[column.name] = datum.copy()
8cdf0349
BP
1502
1503 def insert(self, table, new_uuid=None):
1504 """Inserts and returns a new row in 'table', which must be one of the
1505 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1506
1507 The new row is assigned a provisional UUID. If 'uuid' is None then one
1508 is randomly generated; otherwise 'uuid' should specify a randomly
1509 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
1510 different UUID when 'txn' is committed, but the IDL will replace any
1511 uses of the provisional UUID in the data to be to be committed by the
1512 UUID assigned by ovsdb-server."""
1513 assert self._status == Transaction.UNCOMMITTED
1514 if new_uuid is None:
1515 new_uuid = uuid.uuid4()
1516 row = Row(self.idl, table, new_uuid, None)
1517 table.rows[row.uuid] = row
1518 self._txn_rows[row.uuid] = row
1519 return row
1520
1521 def _process_reply(self, msg):
1522 if msg.type == ovs.jsonrpc.Message.T_ERROR:
1523 self._status = Transaction.ERROR
da2d45c6 1524 elif not isinstance(msg.result, (list, tuple)):
8cdf0349 1525 # XXX rate-limit
3a656eaf 1526 vlog.warn('reply to "transact" is not JSON array')
8cdf0349
BP
1527 else:
1528 hard_errors = False
1529 soft_errors = False
1530 lock_errors = False
1531
1532 ops = msg.result
1533 for op in ops:
1534 if op is None:
1535 # This isn't an error in itself but indicates that some
1536 # prior operation failed, so make sure that we know about
1537 # it.
1538 soft_errors = True
da2d45c6 1539 elif isinstance(op, dict):
8cdf0349
BP
1540 error = op.get("error")
1541 if error is not None:
1542 if error == "timed out":
1543 soft_errors = True
1544 elif error == "not owner":
1545 lock_errors = True
1546 elif error == "aborted":
1547 pass
1548 else:
1549 hard_errors = True
1550 self.__set_error_json(op)
1551 else:
1552 hard_errors = True
1553 self.__set_error_json(op)
1554 # XXX rate-limit
3a656eaf 1555 vlog.warn("operation reply is not JSON null or object")
8cdf0349
BP
1556
1557 if not soft_errors and not hard_errors and not lock_errors:
94fbe1aa 1558 if self._inc_row and not self.__process_inc_reply(ops):
8cdf0349 1559 hard_errors = True
80c12152
SA
1560 if self._fetch_requests:
1561 if self.__process_fetch_reply(ops):
1562 self.idl.change_seqno += 1
1563 else:
1564 hard_errors = True
8cdf0349 1565
cb96c1b2 1566 for insert in six.itervalues(self._inserted_rows):
8cdf0349
BP
1567 if not self.__process_insert_reply(insert, ops):
1568 hard_errors = True
1569
1570 if hard_errors:
1571 self._status = Transaction.ERROR
1572 elif lock_errors:
1573 self._status = Transaction.NOT_LOCKED
1574 elif soft_errors:
854a94d9 1575 self._status = Transaction.TRY_AGAIN
8cdf0349
BP
1576 else:
1577 self._status = Transaction.SUCCESS
1578
1579 @staticmethod
1580 def __check_json_type(json, types, name):
1581 if not json:
1582 # XXX rate-limit
3a656eaf 1583 vlog.warn("%s is missing" % name)
8cdf0349 1584 return False
da2d45c6 1585 elif not isinstance(json, tuple(types)):
8cdf0349 1586 # XXX rate-limit
3a656eaf 1587 vlog.warn("%s has unexpected type %s" % (name, type(json)))
8cdf0349
BP
1588 return False
1589 else:
1590 return True
1591
80c12152
SA
1592 def __process_fetch_reply(self, ops):
1593 update = False
1594 for fetch_request in self._fetch_requests:
1595 row = fetch_request["row"]
1596 column_name = fetch_request["column_name"]
1597 index = fetch_request["index"]
1598 table = row._table
1599
1600 select = ops[index]
1601 fetched_rows = select.get("rows")
1602 if not Transaction.__check_json_type(fetched_rows, (list, tuple),
1603 '"select" reply "rows"'):
1604 return False
1605 if len(fetched_rows) != 1:
1606 # XXX rate-limit
1607 vlog.warn('"select" reply "rows" has %d elements '
e8049bc5 1608 'instead of 1' % len(fetched_rows))
80c12152
SA
1609 continue
1610 fetched_row = fetched_rows[0]
1611 if not Transaction.__check_json_type(fetched_row, (dict,),
1612 '"select" reply row'):
1613 continue
1614
1615 column = table.columns.get(column_name)
1616 datum_json = fetched_row.get(column_name)
a59912a0 1617 datum = data.Datum.from_json(column.type, datum_json)
80c12152
SA
1618
1619 row._data[column_name] = datum
1620 update = True
1621
1622 return update
1623
8cdf0349
BP
1624 def __process_inc_reply(self, ops):
1625 if self._inc_index + 2 > len(ops):
1626 # XXX rate-limit
3a656eaf
EJ
1627 vlog.warn("reply does not contain enough operations for "
1628 "increment (has %d, needs %d)" %
1629 (len(ops), self._inc_index + 2))
8cdf0349
BP
1630
1631 # We know that this is a JSON object because the loop in
1632 # __process_reply() already checked.
1633 mutate = ops[self._inc_index]
1634 count = mutate.get("count")
8f808842 1635 if not Transaction.__check_json_type(count, six.integer_types,
8cdf0349
BP
1636 '"mutate" reply "count"'):
1637 return False
1638 if count != 1:
1639 # XXX rate-limit
3a656eaf 1640 vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
8cdf0349
BP
1641 return False
1642
1643 select = ops[self._inc_index + 1]
1644 rows = select.get("rows")
1645 if not Transaction.__check_json_type(rows, (list, tuple),
1646 '"select" reply "rows"'):
1647 return False
1648 if len(rows) != 1:
1649 # XXX rate-limit
3a656eaf
EJ
1650 vlog.warn('"select" reply "rows" has %d elements '
1651 'instead of 1' % len(rows))
8cdf0349
BP
1652 return False
1653 row = rows[0]
1654 if not Transaction.__check_json_type(row, (dict,),
1655 '"select" reply row'):
1656 return False
1657 column = row.get(self._inc_column)
8f808842 1658 if not Transaction.__check_json_type(column, six.integer_types,
8cdf0349
BP
1659 '"select" reply inc column'):
1660 return False
1661 self._inc_new_value = column
1662 return True
1663
1664 def __process_insert_reply(self, insert, ops):
1665 if insert.op_index >= len(ops):
1666 # XXX rate-limit
3a656eaf
EJ
1667 vlog.warn("reply does not contain enough operations "
1668 "for insert (has %d, needs %d)"
1669 % (len(ops), insert.op_index))
8cdf0349
BP
1670 return False
1671
1672 # We know that this is a JSON object because the loop in
1673 # __process_reply() already checked.
1674 reply = ops[insert.op_index]
1675 json_uuid = reply.get("uuid")
1676 if not Transaction.__check_json_type(json_uuid, (tuple, list),
1677 '"insert" reply "uuid"'):
1678 return False
1679
1680 try:
1681 uuid_ = ovs.ovsuuid.from_json(json_uuid)
1682 except error.Error:
1683 # XXX rate-limit
3a656eaf 1684 vlog.warn('"insert" reply "uuid" is not a JSON UUID')
8cdf0349 1685 return False
bf6ec045 1686
8cdf0349
BP
1687 insert.real = uuid_
1688 return True
ad0991e6
EJ
1689
1690
1691class SchemaHelper(object):
1692 """IDL Schema helper.
1693
1694 This class encapsulates the logic required to generate schemas suitable
1695 for creating 'ovs.db.idl.Idl' objects. Clients should register columns
1696 they are interested in using register_columns(). When finished, the
1697 get_idl_schema() function may be called.
1698
1699 The location on disk of the schema used may be found in the
1700 'schema_location' variable."""
1701
e15ad8e6
IY
1702 def __init__(self, location=None, schema_json=None):
1703 """Creates a new Schema object.
ad0991e6 1704
e15ad8e6
IY
1705 'location' file path to ovs schema. None means default location
1706 'schema_json' schema in json preresentation in memory
1707 """
1708
1709 if location and schema_json:
1710 raise ValueError("both location and schema_json can't be "
1711 "specified. it's ambiguous.")
1712 if schema_json is None:
1713 if location is None:
1714 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1715 schema_json = ovs.json.from_file(location)
ad0991e6 1716
e15ad8e6 1717 self.schema_json = schema_json
ad0991e6 1718 self._tables = {}
80c12152 1719 self._readonly = {}
bf42f674 1720 self._all = False
ad0991e6 1721
80c12152 1722 def register_columns(self, table, columns, readonly=[]):
ad0991e6
EJ
1723 """Registers interest in the given 'columns' of 'table'. Future calls
1724 to get_idl_schema() will include 'table':column for each column in
1725 'columns'. This function automatically avoids adding duplicate entries
1726 to the schema.
80c12152
SA
1727 A subset of 'columns' can be specified as 'readonly'. The readonly
1728 columns are not replicated but can be fetched on-demand by the user
1729 with Row.fetch().
ad0991e6
EJ
1730
1731 'table' must be a string.
1732 'columns' must be a list of strings.
80c12152 1733 'readonly' must be a list of strings.
ad0991e6
EJ
1734 """
1735
da2d45c6
RB
1736 assert isinstance(table, six.string_types)
1737 assert isinstance(columns, list)
ad0991e6
EJ
1738
1739 columns = set(columns) | self._tables.get(table, set())
1740 self._tables[table] = columns
80c12152 1741 self._readonly[table] = readonly
ad0991e6 1742
7698e31d
IY
1743 def register_table(self, table):
1744 """Registers interest in the given all columns of 'table'. Future calls
1745 to get_idl_schema() will include all columns of 'table'.
1746
1747 'table' must be a string
1748 """
da2d45c6 1749 assert isinstance(table, six.string_types)
7698e31d
IY
1750 self._tables[table] = set() # empty set means all columns in the table
1751
bf42f674
EJ
1752 def register_all(self):
1753 """Registers interest in every column of every table."""
1754 self._all = True
1755
ad0991e6
EJ
1756 def get_idl_schema(self):
1757 """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1758 object based on columns registered using the register_columns()
1759 function."""
1760
e15ad8e6
IY
1761 schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
1762 self.schema_json = None
ad0991e6 1763
bf42f674
EJ
1764 if not self._all:
1765 schema_tables = {}
cb96c1b2 1766 for table, columns in six.iteritems(self._tables):
bf42f674
EJ
1767 schema_tables[table] = (
1768 self._keep_table_columns(schema, table, columns))
1769
1770 schema.tables = schema_tables
80c12152 1771 schema.readonly = self._readonly
ad0991e6
EJ
1772 return schema
1773
1774 def _keep_table_columns(self, schema, table_name, columns):
1775 assert table_name in schema.tables
1776 table = schema.tables[table_name]
1777
7698e31d
IY
1778 if not columns:
1779 # empty set means all columns in the table
1780 return table
1781
ad0991e6
EJ
1782 new_columns = {}
1783 for column_name in columns:
da2d45c6 1784 assert isinstance(column_name, six.string_types)
ad0991e6
EJ
1785 assert column_name in table.columns
1786
1787 new_columns[column_name] = table.columns[column_name]
1788
1789 table.columns = new_columns
1790 return table