]> git.proxmox.com Git - mirror_ovs.git/blame - python/ovs/db/idl.py
dpdk: Use VLOG_INFO_ONCE instead of open-coding it.
[mirror_ovs.git] / python / ovs / db / idl.py
CommitLineData
0164e367 1# Copyright (c) 2009, 2010, 2011, 2012, 2013, 2016 Nicira, Inc.
99155935
BP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at:
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
fbafc3c2 15import functools
8cdf0349 16import uuid
99155935 17
cb96c1b2
RB
18import six
19
99155935 20import ovs.jsonrpc
a59912a0 21import ovs.db.data as data
4c0f6271 22import ovs.db.parser
99155935
BP
23import ovs.db.schema
24from ovs.db import error
25import ovs.ovsuuid
8cdf0349 26import ovs.poller
3a656eaf
EJ
27import ovs.vlog
28
29vlog = ovs.vlog.Vlog("idl")
99155935 30
26bb0f31
EJ
31__pychecker__ = 'no-classattr no-objattrs'
32
d7d417fc
TW
33ROW_CREATE = "create"
34ROW_UPDATE = "update"
35ROW_DELETE = "delete"
26bb0f31 36
897c8064
LS
37OVSDB_UPDATE = 0
38OVSDB_UPDATE2 = 1
39
d7d417fc
TW
40
41class Idl(object):
99155935
BP
42 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
43
44 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
45 requests to an OVSDB database server and parses the responses, converting
46 raw JSON into data structures that are easier for clients to digest.
47
48 The IDL also assists with issuing database transactions. The client
49 creates a transaction, manipulates the IDL data structures, and commits or
50 aborts the transaction. The IDL then composes and issues the necessary
51 JSON-RPC requests and reports to the client whether the transaction
52 completed successfully.
53
8cdf0349
BP
54 The client is allowed to access the following attributes directly, in a
55 read-only fashion:
99155935 56
8cdf0349
BP
57 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
58 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
59 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
60 to a Row object.
61
62 The client may directly read and write the Row objects referenced by the
63 'rows' map values. Refer to Row for more details.
64
65 - 'change_seqno': A number that represents the IDL's state. When the IDL
2f926787
BP
66 is updated (by Idl.run()), its value changes. The sequence number can
67 occasionally change even if the database does not. This happens if the
68 connection to the database drops and reconnects, which causes the
69 database contents to be reloaded even if they didn't change. (It could
70 also happen if the database server sends out a "change" that reflects
71 what the IDL already thought was in the database. The database server is
72 not supposed to do that, but bugs could in theory cause it to do so.)
8cdf0349
BP
73
74 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
75 if no lock is configured.
76
77 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
78 lock, and False otherwise.
79
80 Locking and unlocking happens asynchronously from the database client's
81 point of view, so the information is only useful for optimization
82 (e.g. if the client doesn't have the lock then there's no point in trying
83 to write to the database).
84
85 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
86 the database server has indicated that some other client already owns the
87 requested lock, and False otherwise.
88
89 - 'txn': The ovs.db.idl.Transaction object for the database transaction
90 currently being constructed, if there is one, or None otherwise.
91"""
92
897c8064
LS
93 IDL_S_INITIAL = 0
94 IDL_S_MONITOR_REQUESTED = 1
95 IDL_S_MONITOR_COND_REQUESTED = 2
96
8cdf0349 97 def __init__(self, remote, schema):
99155935
BP
98 """Creates and returns a connection to the database named 'db_name' on
99 'remote', which should be in a form acceptable to
100 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
8cdf0349
BP
101 replica of the remote database.
102
103 'schema' should be the schema for the remote database. The caller may
104 have cut it down by removing tables or columns that are not of
105 interest. The IDL will only replicate the tables and columns that
106 remain. The caller may also add a attribute named 'alert' to selected
107 remaining columns, setting its value to False; if so, then changes to
108 those columns will not be considered changes to the database for the
109 purpose of the return value of Idl.run() and Idl.change_seqno. This is
110 useful for columns that the IDL's client will write but not read.
111
ad0991e6
EJ
112 As a convenience to users, 'schema' may also be an instance of the
113 SchemaHelper class.
114
8cdf0349
BP
115 The IDL uses and modifies 'schema' directly."""
116
bf42f674
EJ
117 assert isinstance(schema, SchemaHelper)
118 schema = schema.get_idl_schema()
ad0991e6 119
8cdf0349 120 self.tables = schema.tables
80c12152 121 self.readonly = schema.readonly
8cdf0349
BP
122 self._db = schema
123 self._session = ovs.jsonrpc.Session.open(remote)
124 self._monitor_request_id = None
125 self._last_seqno = None
99155935 126 self.change_seqno = 0
897c8064
LS
127 self.uuid = uuid.uuid1()
128 self.state = self.IDL_S_INITIAL
8cdf0349
BP
129
130 # Database locking.
131 self.lock_name = None # Name of lock we need, None if none.
132 self.has_lock = False # Has db server said we have the lock?
26bb0f31 133 self.is_lock_contended = False # Has db server said we can't get lock?
8cdf0349
BP
134 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
135
136 # Transaction support.
137 self.txn = None
138 self._outstanding_txns = {}
139
cb96c1b2
RB
140 for table in six.itervalues(schema.tables):
141 for column in six.itervalues(table.columns):
8cdf0349
BP
142 if not hasattr(column, 'alert'):
143 column.alert = True
144 table.need_table = False
145 table.rows = {}
146 table.idl = self
0164e367 147 table.condition = [True]
16ebb90e 148 table.cond_changed = False
99155935
BP
149
150 def close(self):
8cdf0349
BP
151 """Closes the connection to the database. The IDL will no longer
152 update."""
153 self._session.close()
99155935
BP
154
155 def run(self):
156 """Processes a batch of messages from the database server. Returns
157 True if the database as seen through the IDL changed, False if it did
158 not change. The initial fetch of the entire contents of the remote
8cdf0349
BP
159 database is considered to be one kind of change. If the IDL has been
160 configured to acquire a database lock (with Idl.set_lock()), then
161 successfully acquiring the lock is also considered to be a change.
99155935
BP
162
163 This function can return occasional false positives, that is, report
164 that the database changed even though it didn't. This happens if the
165 connection to the database drops and reconnects, which causes the
166 database contents to be reloaded even if they didn't change. (It could
167 also happen if the database server sends out a "change" that reflects
168 what we already thought was in the database, but the database server is
169 not supposed to do that.)
170
171 As an alternative to checking the return value, the client may check
8cdf0349
BP
172 for changes in self.change_seqno."""
173 assert not self.txn
99155935 174 initial_change_seqno = self.change_seqno
16ebb90e
LS
175
176 self.send_cond_change()
8cdf0349
BP
177 self._session.run()
178 i = 0
179 while i < 50:
180 i += 1
181 if not self._session.is_connected():
182 break
183
184 seqno = self._session.get_seqno()
185 if seqno != self._last_seqno:
186 self._last_seqno = seqno
187 self.__txn_abort_all()
188 self.__send_monitor_request()
189 if self.lock_name:
190 self.__send_lock_request()
191 break
192
193 msg = self._session.recv()
194 if msg is None:
195 break
196 if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
897c8064
LS
197 and msg.method == "update2"
198 and len(msg.params) == 2):
199 # Database contents changed.
200 self.__parse_update(msg.params[1], OVSDB_UPDATE2)
201 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
202 and msg.method == "update"
203 and len(msg.params) == 2):
8cdf0349 204 # Database contents changed.
897c8064 205 self.__parse_update(msg.params[1], OVSDB_UPDATE)
8cdf0349
BP
206 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
207 and self._monitor_request_id is not None
208 and self._monitor_request_id == msg.id):
209 # Reply to our "monitor" request.
210 try:
211 self.change_seqno += 1
212 self._monitor_request_id = None
213 self.__clear()
897c8064
LS
214 if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
215 self.__parse_update(msg.result, OVSDB_UPDATE2)
216 else:
217 assert self.state == self.IDL_S_MONITOR_REQUESTED
218 self.__parse_update(msg.result, OVSDB_UPDATE)
219
3ab76c56 220 except error.Error as e:
3a656eaf 221 vlog.err("%s: parse error in received schema: %s"
897c8064 222 % (self._session.get_name(), e))
8cdf0349
BP
223 self.__error()
224 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
225 and self._lock_request_id is not None
226 and self._lock_request_id == msg.id):
227 # Reply to our "lock" request.
228 self.__parse_lock_reply(msg.result)
229 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
230 and msg.method == "locked"):
231 # We got our lock.
232 self.__parse_lock_notify(msg.params, True)
233 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
234 and msg.method == "stolen"):
235 # Someone else stole our lock.
236 self.__parse_lock_notify(msg.params, False)
237 elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
238 # Reply to our echo request. Ignore it.
239 pass
897c8064
LS
240 elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
241 self.state == self.IDL_S_MONITOR_COND_REQUESTED and
242 self._monitor_request_id == msg.id):
243 if msg.error == "unknown method":
244 self.__send_monitor_request()
8cdf0349
BP
245 elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
246 ovs.jsonrpc.Message.T_REPLY)
247 and self.__txn_process_reply(msg)):
248 # __txn_process_reply() did everything needed.
249 pass
250 else:
251 # This can happen if a transaction is destroyed before we
252 # receive the reply, so keep the log level low.
3a656eaf
EJ
253 vlog.dbg("%s: received unexpected %s message"
254 % (self._session.get_name(),
255 ovs.jsonrpc.Message.type_to_string(msg.type)))
8cdf0349 256
99155935
BP
257 return initial_change_seqno != self.change_seqno
258
16ebb90e
LS
259 def send_cond_change(self):
260 if not self._session.is_connected():
261 return
262
263 for table in six.itervalues(self.tables):
264 if table.cond_changed:
265 self.__send_cond_change(table, table.condition)
266 table.cond_changed = False
267
0164e367
BP
268 def cond_change(self, table_name, cond):
269 """Sets the condition for 'table_name' to 'cond', which should be a
270 conditional expression suitable for use directly in the OVSDB
271 protocol, with the exception that the empty condition []
272 matches no rows (instead of matching every row). That is, []
273 is equivalent to [False], not to [True].
274 """
16ebb90e 275
897c8064
LS
276 table = self.tables.get(table_name)
277 if not table:
278 raise error.Error('Unknown table "%s"' % table_name)
16ebb90e 279
0164e367
BP
280 if cond == []:
281 cond = [False]
282 if table.condition != cond:
283 table.condition = cond
284 table.cond_changed = True
897c8064 285
99155935
BP
286 def wait(self, poller):
287 """Arranges for poller.block() to wake up when self.run() has something
288 to do or when activity occurs on a transaction on 'self'."""
8cdf0349
BP
289 self._session.wait(poller)
290 self._session.recv_wait(poller)
99155935 291
8cdf0349
BP
292 def has_ever_connected(self):
293 """Returns True, if the IDL successfully connected to the remote
294 database and retrieved its contents (even if the connection
295 subsequently dropped and is in the process of reconnecting). If so,
296 then the IDL contains an atomic snapshot of the database's contents
297 (but it might be arbitrarily old if the connection dropped).
298
299 Returns False if the IDL has never connected or retrieved the
300 database's contents. If so, the IDL is empty."""
301 return self.change_seqno != 0
302
303 def force_reconnect(self):
304 """Forces the IDL to drop its connection to the database and reconnect.
305 In the meantime, the contents of the IDL will not change."""
306 self._session.force_reconnect()
307
308 def set_lock(self, lock_name):
309 """If 'lock_name' is not None, configures the IDL to obtain the named
310 lock from the database server and to avoid modifying the database when
311 the lock cannot be acquired (that is, when another client has the same
312 lock).
313
314 If 'lock_name' is None, drops the locking requirement and releases the
315 lock."""
316 assert not self.txn
317 assert not self._outstanding_txns
318
319 if self.lock_name and (not lock_name or lock_name != self.lock_name):
320 # Release previous lock.
321 self.__send_unlock_request()
322 self.lock_name = None
323 self.is_lock_contended = False
324
325 if lock_name and not self.lock_name:
326 # Acquire new lock.
327 self.lock_name = lock_name
328 self.__send_lock_request()
329
d7d417fc
TW
330 def notify(self, event, row, updates=None):
331 """Hook for implementing create/update/delete notifications
332
333 :param event: The event that was triggered
334 :type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE
335 :param row: The row as it is after the operation has occured
336 :type row: Row
a7261bf7
NS
337 :param updates: For updates, row with only old values of the changed
338 columns
d7d417fc
TW
339 :type updates: Row
340 """
341
897c8064
LS
342 def __send_cond_change(self, table, cond):
343 monitor_cond_change = {table.name: [{"where": cond}]}
344 old_uuid = str(self.uuid)
345 self.uuid = uuid.uuid1()
346 params = [old_uuid, str(self.uuid), monitor_cond_change]
347 msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
348 self._session.send(msg)
349
8cdf0349
BP
350 def __clear(self):
351 changed = False
352
cb96c1b2 353 for table in six.itervalues(self.tables):
8cdf0349
BP
354 if table.rows:
355 changed = True
356 table.rows = {}
99155935 357
8cdf0349
BP
358 if changed:
359 self.change_seqno += 1
360
361 def __update_has_lock(self, new_has_lock):
362 if new_has_lock and not self.has_lock:
363 if self._monitor_request_id is None:
364 self.change_seqno += 1
365 else:
366 # We're waiting for a monitor reply, so don't signal that the
367 # database changed. The monitor reply will increment
368 # change_seqno anyhow.
369 pass
370 self.is_lock_contended = False
371 self.has_lock = new_has_lock
372
373 def __do_send_lock_request(self, method):
374 self.__update_has_lock(False)
375 self._lock_request_id = None
376 if self._session.is_connected():
377 msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
378 msg_id = msg.id
379 self._session.send(msg)
380 else:
381 msg_id = None
382 return msg_id
383
384 def __send_lock_request(self):
385 self._lock_request_id = self.__do_send_lock_request("lock")
386
387 def __send_unlock_request(self):
388 self.__do_send_lock_request("unlock")
389
390 def __parse_lock_reply(self, result):
391 self._lock_request_id = None
da2d45c6 392 got_lock = isinstance(result, dict) and result.get("locked") is True
8cdf0349
BP
393 self.__update_has_lock(got_lock)
394 if not got_lock:
395 self.is_lock_contended = True
396
397 def __parse_lock_notify(self, params, new_has_lock):
398 if (self.lock_name is not None
da2d45c6 399 and isinstance(params, (list, tuple))
8cdf0349
BP
400 and params
401 and params[0] == self.lock_name):
e9ad9219 402 self.__update_has_lock(new_has_lock)
8cdf0349
BP
403 if not new_has_lock:
404 self.is_lock_contended = True
99155935
BP
405
406 def __send_monitor_request(self):
897c8064
LS
407 if self.state == self.IDL_S_INITIAL:
408 self.state = self.IDL_S_MONITOR_COND_REQUESTED
409 method = "monitor_cond"
410 else:
411 self.state = self.IDL_S_MONITOR_REQUESTED
412 method = "monitor"
413
99155935 414 monitor_requests = {}
cb96c1b2 415 for table in six.itervalues(self.tables):
80c12152 416 columns = []
cb96c1b2 417 for column in six.iterkeys(table.columns):
80c12152 418 if ((table.name not in self.readonly) or
897c8064
LS
419 (table.name in self.readonly) and
420 (column not in self.readonly[table.name])):
80c12152
SA
421 columns.append(column)
422 monitor_requests[table.name] = {"columns": columns}
0164e367 423 if method == "monitor_cond" and table.condition != [True]:
897c8064 424 monitor_requests[table.name]["where"] = table.condition
16ebb90e 425 table.cond_change = False
897c8064 426
99155935 427 msg = ovs.jsonrpc.Message.create_request(
897c8064 428 method, [self._db.name, str(self.uuid), monitor_requests])
8cdf0349
BP
429 self._monitor_request_id = msg.id
430 self._session.send(msg)
99155935 431
897c8064 432 def __parse_update(self, update, version):
99155935 433 try:
897c8064 434 self.__do_parse_update(update, version)
3ab76c56 435 except error.Error as e:
3a656eaf
EJ
436 vlog.err("%s: error parsing update: %s"
437 % (self._session.get_name(), e))
99155935 438
897c8064 439 def __do_parse_update(self, table_updates, version):
da2d45c6 440 if not isinstance(table_updates, dict):
99155935
BP
441 raise error.Error("<table-updates> is not an object",
442 table_updates)
443
cb96c1b2 444 for table_name, table_update in six.iteritems(table_updates):
8cdf0349 445 table = self.tables.get(table_name)
99155935 446 if not table:
f2d8ad13
BP
447 raise error.Error('<table-updates> includes unknown '
448 'table "%s"' % table_name)
99155935 449
da2d45c6 450 if not isinstance(table_update, dict):
f2d8ad13
BP
451 raise error.Error('<table-update> for table "%s" is not '
452 'an object' % table_name, table_update)
99155935 453
cb96c1b2 454 for uuid_string, row_update in six.iteritems(table_update):
49c541dc 455 if not ovs.ovsuuid.is_valid_string(uuid_string):
f2d8ad13
BP
456 raise error.Error('<table-update> for table "%s" '
457 'contains bad UUID "%s" as member '
458 'name' % (table_name, uuid_string),
99155935 459 table_update)
49c541dc 460 uuid = ovs.ovsuuid.from_string(uuid_string)
99155935 461
da2d45c6 462 if not isinstance(row_update, dict):
f2d8ad13
BP
463 raise error.Error('<table-update> for table "%s" '
464 'contains <row-update> for %s that '
465 'is not an object'
99155935
BP
466 % (table_name, uuid_string))
467
897c8064
LS
468 if version == OVSDB_UPDATE2:
469 if self.__process_update2(table, uuid, row_update):
470 self.change_seqno += 1
471 continue
472
af1eba26 473 parser = ovs.db.parser.Parser(row_update, "row-update")
4c0f6271
BP
474 old = parser.get_optional("old", [dict])
475 new = parser.get_optional("new", [dict])
476 parser.finish()
99155935 477
99155935 478 if not old and not new:
f2d8ad13
BP
479 raise error.Error('<row-update> missing "old" and '
480 '"new" members', row_update)
99155935 481
8cdf0349 482 if self.__process_update(table, uuid, old, new):
99155935
BP
483 self.change_seqno += 1
484
897c8064
LS
485 def __process_update2(self, table, uuid, row_update):
486 row = table.rows.get(uuid)
487 changed = False
488 if "delete" in row_update:
489 if row:
490 del table.rows[uuid]
491 self.notify(ROW_DELETE, row)
492 changed = True
493 else:
494 # XXX rate-limit
495 vlog.warn("cannot delete missing row %s from table"
496 "%s" % (uuid, table.name))
497 elif "insert" in row_update or "initial" in row_update:
498 if row:
499 vlog.warn("cannot add existing row %s from table"
500 " %s" % (uuid, table.name))
501 del table.rows[uuid]
502 row = self.__create_row(table, uuid)
503 if "insert" in row_update:
504 row_update = row_update['insert']
505 else:
506 row_update = row_update['initial']
507 self.__add_default(table, row_update)
508 if self.__row_update(table, row, row_update):
509 changed = True
510 self.notify(ROW_CREATE, row)
511 elif "modify" in row_update:
512 if not row:
513 raise error.Error('Modify non-existing row')
514
a7261bf7
NS
515 old_row_diff_json = self.__apply_diff(table, row,
516 row_update['modify'])
897c8064 517 self.notify(ROW_UPDATE, row,
a7261bf7 518 Row.from_json(self, table, uuid, old_row_diff_json))
897c8064
LS
519 changed = True
520 else:
521 raise error.Error('<row-update> unknown operation',
522 row_update)
523 return changed
524
8cdf0349 525 def __process_update(self, table, uuid, old, new):
99155935 526 """Returns True if a column changed, False otherwise."""
8cdf0349 527 row = table.rows.get(uuid)
7d48f8f8 528 changed = False
99155935
BP
529 if not new:
530 # Delete row.
531 if row:
8cdf0349 532 del table.rows[uuid]
7d48f8f8 533 changed = True
d7d417fc 534 self.notify(ROW_DELETE, row)
99155935
BP
535 else:
536 # XXX rate-limit
3a656eaf
EJ
537 vlog.warn("cannot delete missing row %s from table %s"
538 % (uuid, table.name))
99155935
BP
539 elif not old:
540 # Insert row.
541 if not row:
542 row = self.__create_row(table, uuid)
7d48f8f8 543 changed = True
99155935
BP
544 else:
545 # XXX rate-limit
3a656eaf
EJ
546 vlog.warn("cannot add existing row %s to table %s"
547 % (uuid, table.name))
8cdf0349 548 if self.__row_update(table, row, new):
7d48f8f8 549 changed = True
d7d417fc 550 self.notify(ROW_CREATE, row)
99155935 551 else:
d7d417fc 552 op = ROW_UPDATE
99155935
BP
553 if not row:
554 row = self.__create_row(table, uuid)
7d48f8f8 555 changed = True
d7d417fc 556 op = ROW_CREATE
99155935 557 # XXX rate-limit
3a656eaf
EJ
558 vlog.warn("cannot modify missing row %s in table %s"
559 % (uuid, table.name))
8cdf0349 560 if self.__row_update(table, row, new):
7d48f8f8 561 changed = True
d7d417fc 562 self.notify(op, row, Row.from_json(self, table, uuid, old))
7d48f8f8 563 return changed
99155935 564
897c8064
LS
565 def __column_name(self, column):
566 if column.type.key.type == ovs.db.types.UuidType:
567 return ovs.ovsuuid.to_json(column.type.key.type.default)
568 else:
569 return column.type.key.type.default
570
571 def __add_default(self, table, row_update):
572 for column in six.itervalues(table.columns):
573 if column.name not in row_update:
574 if ((table.name not in self.readonly) or
575 (table.name in self.readonly) and
576 (column.name not in self.readonly[table.name])):
577 if column.type.n_min != 0 and not column.type.is_map():
578 row_update[column.name] = self.__column_name(column)
579
580 def __apply_diff(self, table, row, row_diff):
a7261bf7
NS
581 old_row_diff_json = {}
582 for column_name, datum_diff_json in six.iteritems(row_diff):
897c8064
LS
583 column = table.columns.get(column_name)
584 if not column:
585 # XXX rate-limit
586 vlog.warn("unknown column %s updating table %s"
587 % (column_name, table.name))
588 continue
589
590 try:
a59912a0 591 datum_diff = data.Datum.from_json(column.type, datum_diff_json)
897c8064
LS
592 except error.Error as e:
593 # XXX rate-limit
594 vlog.warn("error parsing column %s in table %s: %s"
595 % (column_name, table.name, e))
596 continue
597
a7261bf7
NS
598 old_row_diff_json[column_name] = row._data[column_name].to_json()
599 datum = row._data[column_name].diff(datum_diff)
897c8064
LS
600 if datum != row._data[column_name]:
601 row._data[column_name] = datum
602
a7261bf7
NS
603 return old_row_diff_json
604
8cdf0349 605 def __row_update(self, table, row, row_json):
99155935 606 changed = False
cb96c1b2 607 for column_name, datum_json in six.iteritems(row_json):
99155935
BP
608 column = table.columns.get(column_name)
609 if not column:
610 # XXX rate-limit
3a656eaf
EJ
611 vlog.warn("unknown column %s updating table %s"
612 % (column_name, table.name))
99155935
BP
613 continue
614
615 try:
a59912a0 616 datum = data.Datum.from_json(column.type, datum_json)
3ab76c56 617 except error.Error as e:
99155935 618 # XXX rate-limit
3a656eaf
EJ
619 vlog.warn("error parsing column %s in table %s: %s"
620 % (column_name, table.name, e))
99155935
BP
621 continue
622
8cdf0349
BP
623 if datum != row._data[column_name]:
624 row._data[column_name] = datum
625 if column.alert:
626 changed = True
99155935
BP
627 else:
628 # Didn't really change but the OVSDB monitor protocol always
629 # includes every value in a row.
630 pass
631 return changed
632
99155935 633 def __create_row(self, table, uuid):
8cdf0349 634 data = {}
cb96c1b2 635 for column in six.itervalues(table.columns):
8cdf0349
BP
636 data[column.name] = ovs.db.data.Datum.default(column.type)
637 row = table.rows[uuid] = Row(self, table, uuid, data)
99155935
BP
638 return row
639
8cdf0349
BP
640 def __error(self):
641 self._session.force_reconnect()
642
643 def __txn_abort_all(self):
644 while self._outstanding_txns:
645 txn = self._outstanding_txns.popitem()[1]
854a94d9 646 txn._status = Transaction.TRY_AGAIN
8cdf0349
BP
647
648 def __txn_process_reply(self, msg):
649 txn = self._outstanding_txns.pop(msg.id, None)
650 if txn:
651 txn._process_reply(msg)
beba3d82 652 return True
8cdf0349 653
26bb0f31 654
8cdf0349
BP
655def _uuid_to_row(atom, base):
656 if base.ref_table:
657 return base.ref_table.rows.get(atom)
658 else:
659 return atom
660
26bb0f31 661
8cdf0349 662def _row_to_uuid(value):
da2d45c6 663 if isinstance(value, Row):
8cdf0349
BP
664 return value.uuid
665 else:
666 return value
bf6ec045 667
26bb0f31 668
fbafc3c2 669@functools.total_ordering
bf6ec045 670class Row(object):
8cdf0349
BP
671 """A row within an IDL.
672
673 The client may access the following attributes directly:
674
675 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
676
677 - An attribute for each column in the Row's table, named for the column,
678 whose values are as returned by Datum.to_python() for the column's type.
679
680 If some error occurs (e.g. the database server's idea of the column is
681 different from the IDL's idea), then the attribute values is the
682 "default" value return by Datum.default() for the column's type. (It is
683 important to know this because the default value may violate constraints
684 for the column's type, e.g. the default integer value is 0 even if column
685 contraints require the column's value to be positive.)
686
687 When a transaction is active, column attributes may also be assigned new
688 values. Committing the transaction will then cause the new value to be
689 stored into the database.
690
691 *NOTE*: In the current implementation, the value of a column is a *copy*
692 of the value in the database. This means that modifying its value
693 directly will have no useful effect. For example, the following:
694 row.mycolumn["a"] = "b" # don't do this
695 will not change anything in the database, even after commit. To modify
696 the column, instead assign the modified column value back to the column:
697 d = row.mycolumn
698 d["a"] = "b"
699 row.mycolumn = d
700"""
701 def __init__(self, idl, table, uuid, data):
702 # All of the explicit references to self.__dict__ below are required
703 # to set real attributes with invoking self.__getattr__().
704 self.__dict__["uuid"] = uuid
705
706 self.__dict__["_idl"] = idl
707 self.__dict__["_table"] = table
708
709 # _data is the committed data. It takes the following values:
710 #
711 # - A dictionary that maps every column name to a Datum, if the row
712 # exists in the committed form of the database.
713 #
714 # - None, if this row is newly inserted within the active transaction
715 # and thus has no committed form.
716 self.__dict__["_data"] = data
717
718 # _changes describes changes to this row within the active transaction.
719 # It takes the following values:
720 #
721 # - {}, the empty dictionary, if no transaction is active or if the
722 # row has yet not been changed within this transaction.
723 #
724 # - A dictionary that maps a column name to its new Datum, if an
725 # active transaction changes those columns' values.
726 #
727 # - A dictionary that maps every column name to a Datum, if the row
728 # is newly inserted within the active transaction.
729 #
730 # - None, if this transaction deletes this row.
731 self.__dict__["_changes"] = {}
732
a59912a0
RM
733 # _mutations describes changes to this row to be handled via a
734 # mutate operation on the wire. It takes the following values:
735 #
736 # - {}, the empty dictionary, if no transaction is active or if the
737 # row has yet not been mutated within this transaction.
738 #
739 # - A dictionary that contains two keys:
740 #
741 # - "_inserts" contains a dictionary that maps column names to
742 # new keys/key-value pairs that should be inserted into the
743 # column
744 # - "_removes" contains a dictionary that maps column names to
745 # the keys/key-value pairs that should be removed from the
746 # column
747 #
748 # - None, if this transaction deletes this row.
749 self.__dict__["_mutations"] = {}
750
8cdf0349
BP
751 # A dictionary whose keys are the names of columns that must be
752 # verified as prerequisites when the transaction commits. The values
753 # in the dictionary are all None.
754 self.__dict__["_prereqs"] = {}
755
fbafc3c2
RB
756 def __lt__(self, other):
757 if not isinstance(other, Row):
758 return NotImplemented
759 return bool(self.__dict__['uuid'] < other.__dict__['uuid'])
760
761 def __eq__(self, other):
762 if not isinstance(other, Row):
763 return NotImplemented
764 return bool(self.__dict__['uuid'] == other.__dict__['uuid'])
765
766 def __hash__(self):
767 return int(self.__dict__['uuid'])
768
8cdf0349
BP
769 def __getattr__(self, column_name):
770 assert self._changes is not None
a59912a0 771 assert self._mutations is not None
8cdf0349 772
2d54d801 773 column = self._table.columns[column_name]
8cdf0349 774 datum = self._changes.get(column_name)
a59912a0
RM
775 inserts = None
776 if '_inserts' in self._mutations.keys():
777 inserts = self._mutations['_inserts'].get(column_name)
778 removes = None
779 if '_removes' in self._mutations.keys():
780 removes = self._mutations['_removes'].get(column_name)
8cdf0349 781 if datum is None:
3b4c362f 782 if self._data is None:
a59912a0
RM
783 if inserts is None:
784 raise AttributeError("%s instance has no attribute '%s'" %
785 (self.__class__.__name__,
786 column_name))
787 else:
2d54d801
AB
788 datum = data.Datum.from_python(column.type,
789 inserts,
790 _row_to_uuid)
791 elif column_name in self._data:
80c12152 792 datum = self._data[column_name]
2d54d801
AB
793 if column.type.is_set():
794 dlist = datum.as_list()
a59912a0 795 if inserts is not None:
2d54d801 796 dlist.extend(list(inserts))
a59912a0 797 if removes is not None:
2d54d801
AB
798 removes_datum = data.Datum.from_python(column.type,
799 removes,
800 _row_to_uuid)
801 removes_list = removes_datum.as_list()
802 dlist = [x for x in dlist if x not in removes_list]
803 datum = data.Datum.from_python(column.type, dlist,
804 _row_to_uuid)
805 elif column.type.is_map():
806 dmap = datum.to_python(_uuid_to_row)
a59912a0 807 if inserts is not None:
2d54d801 808 dmap.update(inserts)
a59912a0 809 if removes is not None:
2d54d801
AB
810 for key in removes:
811 if key not in (inserts or {}):
812 del dmap[key]
813 datum = data.Datum.from_python(column.type, dmap,
814 _row_to_uuid)
80c12152 815 else:
a59912a0
RM
816 if inserts is None:
817 raise AttributeError("%s instance has no attribute '%s'" %
818 (self.__class__.__name__,
819 column_name))
820 else:
821 datum = inserts
8cdf0349
BP
822
823 return datum.to_python(_uuid_to_row)
824
825 def __setattr__(self, column_name, value):
826 assert self._changes is not None
827 assert self._idl.txn
828
80c12152 829 if ((self._table.name in self._idl.readonly) and
897c8064 830 (column_name in self._idl.readonly[self._table.name])):
37520ab3
RB
831 vlog.warn("attempting to write to readonly column %s"
832 % column_name)
80c12152
SA
833 return
834
8cdf0349
BP
835 column = self._table.columns[column_name]
836 try:
a59912a0 837 datum = data.Datum.from_python(column.type, value, _row_to_uuid)
3ab76c56 838 except error.Error as e:
8cdf0349 839 # XXX rate-limit
3a656eaf
EJ
840 vlog.err("attempting to write bad value to column %s (%s)"
841 % (column_name, e))
8cdf0349
BP
842 return
843 self._idl.txn._write(self, column, datum)
844
a59912a0
RM
845 def addvalue(self, column_name, key):
846 self._idl.txn._txn_rows[self.uuid] = self
330b9c9c
AB
847 column = self._table.columns[column_name]
848 try:
849 data.Datum.from_python(column.type, key, _row_to_uuid)
850 except error.Error as e:
851 # XXX rate-limit
852 vlog.err("attempting to write bad value to column %s (%s)"
853 % (column_name, e))
854 return
855 inserts = self._mutations.setdefault('_inserts', {})
856 column_value = inserts.setdefault(column_name, set())
857 column_value.add(key)
a59912a0
RM
858
859 def delvalue(self, column_name, key):
860 self._idl.txn._txn_rows[self.uuid] = self
330b9c9c
AB
861 column = self._table.columns[column_name]
862 try:
863 data.Datum.from_python(column.type, key, _row_to_uuid)
864 except error.Error as e:
865 # XXX rate-limit
866 vlog.err("attempting to delete bad value from column %s (%s)"
867 % (column_name, e))
868 return
869 removes = self._mutations.setdefault('_removes', {})
870 column_value = removes.setdefault(column_name, set())
871 column_value.add(key)
a59912a0
RM
872
873 def setkey(self, column_name, key, value):
874 self._idl.txn._txn_rows[self.uuid] = self
875 column = self._table.columns[column_name]
876 try:
330b9c9c 877 data.Datum.from_python(column.type, {key: value}, _row_to_uuid)
a59912a0
RM
878 except error.Error as e:
879 # XXX rate-limit
880 vlog.err("attempting to write bad value to column %s (%s)"
881 % (column_name, e))
882 return
2d54d801 883 if self._data and column_name in self._data:
330b9c9c
AB
884 # Remove existing key/value before updating.
885 removes = self._mutations.setdefault('_removes', {})
886 column_value = removes.setdefault(column_name, set())
887 column_value.add(key)
888 inserts = self._mutations.setdefault('_inserts', {})
889 column_value = inserts.setdefault(column_name, {})
890 column_value[key] = value
891
892 def delkey(self, column_name, key, value=None):
a59912a0 893 self._idl.txn._txn_rows[self.uuid] = self
330b9c9c
AB
894 if value:
895 try:
896 old_value = data.Datum.to_python(self._data[column_name],
897 _uuid_to_row)
898 except error.Error:
899 return
900 if key not in old_value:
901 return
902 if old_value[key] != value:
903 return
904 removes = self._mutations.setdefault('_removes', {})
905 column_value = removes.setdefault(column_name, set())
906 column_value.add(key)
a59912a0
RM
907 return
908
d7d417fc
TW
909 @classmethod
910 def from_json(cls, idl, table, uuid, row_json):
911 data = {}
cb96c1b2 912 for column_name, datum_json in six.iteritems(row_json):
d7d417fc
TW
913 column = table.columns.get(column_name)
914 if not column:
915 # XXX rate-limit
916 vlog.warn("unknown column %s in table %s"
917 % (column_name, table.name))
918 continue
919 try:
920 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
3ab76c56 921 except error.Error as e:
d7d417fc
TW
922 # XXX rate-limit
923 vlog.warn("error parsing column %s in table %s: %s"
924 % (column_name, table.name, e))
925 continue
926 data[column_name] = datum
927 return cls(idl, table, uuid, data)
928
8cdf0349
BP
929 def verify(self, column_name):
930 """Causes the original contents of column 'column_name' in this row to
931 be verified as a prerequisite to completing the transaction. That is,
932 if 'column_name' changed in this row (or if this row was deleted)
933 between the time that the IDL originally read its contents and the time
934 that the transaction commits, then the transaction aborts and
854a94d9 935 Transaction.commit() returns Transaction.TRY_AGAIN.
8cdf0349
BP
936
937 The intention is that, to ensure that no transaction commits based on
938 dirty reads, an application should call Row.verify() on each data item
939 read as part of a read-modify-write operation.
940
941 In some cases Row.verify() reduces to a no-op, because the current
942 value of the column is already known:
943
944 - If this row is a row created by the current transaction (returned
945 by Transaction.insert()).
946
947 - If the column has already been modified within the current
948 transaction.
949
950 Because of the latter property, always call Row.verify() *before*
951 modifying the column, for a given read-modify-write.
952
953 A transaction must be in progress."""
954 assert self._idl.txn
955 assert self._changes is not None
956 if not self._data or column_name in self._changes:
957 return
958
959 self._prereqs[column_name] = None
960
961 def delete(self):
962 """Deletes this row from its table.
963
964 A transaction must be in progress."""
965 assert self._idl.txn
966 assert self._changes is not None
967 if self._data is None:
968 del self._idl.txn._txn_rows[self.uuid]
8d3efc1c
BP
969 else:
970 self._idl.txn._txn_rows[self.uuid] = self
8cdf0349
BP
971 self.__dict__["_changes"] = None
972 del self._table.rows[self.uuid]
973
80c12152
SA
974 def fetch(self, column_name):
975 self._idl.txn._fetch(self, column_name)
976
94fbe1aa
BP
977 def increment(self, column_name):
978 """Causes the transaction, when committed, to increment the value of
979 'column_name' within this row by 1. 'column_name' must have an integer
980 type. After the transaction commits successfully, the client may
981 retrieve the final (incremented) value of 'column_name' with
982 Transaction.get_increment_new_value().
983
984 The client could accomplish something similar by reading and writing
985 and verify()ing columns. However, increment() will never (by itself)
986 cause a transaction to fail because of a verify error.
987
988 The intended use is for incrementing the "next_cfg" column in
989 the Open_vSwitch table."""
990 self._idl.txn._increment(self, column_name)
991
26bb0f31 992
8cdf0349
BP
993def _uuid_name_from_uuid(uuid):
994 return "row%s" % str(uuid).replace("-", "_")
995
26bb0f31 996
8cdf0349
BP
997def _where_uuid_equals(uuid):
998 return [["_uuid", "==", ["uuid", str(uuid)]]]
999
26bb0f31 1000
8cdf0349
BP
1001class _InsertedRow(object):
1002 def __init__(self, op_index):
1003 self.op_index = op_index
1004 self.real = None
1005
26bb0f31 1006
8cdf0349 1007class Transaction(object):
2f926787
BP
1008 """A transaction may modify the contents of a database by modifying the
1009 values of columns, deleting rows, inserting rows, or adding checks that
1010 columns in the database have not changed ("verify" operations), through
1011 Row methods.
1012
1013 Reading and writing columns and inserting and deleting rows are all
1014 straightforward. The reasons to verify columns are less obvious.
1015 Verification is the key to maintaining transactional integrity. Because
1016 OVSDB handles multiple clients, it can happen that between the time that
1017 OVSDB client A reads a column and writes a new value, OVSDB client B has
1018 written that column. Client A's write should not ordinarily overwrite
1019 client B's, especially if the column in question is a "map" column that
1020 contains several more or less independent data items. If client A adds a
1021 "verify" operation before it writes the column, then the transaction fails
1022 in case client B modifies it first. Client A will then see the new value
1023 of the column and compose a new transaction based on the new contents
1024 written by client B.
1025
1026 When a transaction is complete, which must be before the next call to
1027 Idl.run(), call Transaction.commit() or Transaction.abort().
1028
1029 The life-cycle of a transaction looks like this:
1030
1031 1. Create the transaction and record the initial sequence number:
1032
1033 seqno = idl.change_seqno(idl)
1034 txn = Transaction(idl)
1035
1036 2. Modify the database with Row and Transaction methods.
1037
1038 3. Commit the transaction by calling Transaction.commit(). The first call
1039 to this function probably returns Transaction.INCOMPLETE. The client
1040 must keep calling again along as this remains true, calling Idl.run() in
1041 between to let the IDL do protocol processing. (If the client doesn't
1042 have anything else to do in the meantime, it can use
1043 Transaction.commit_block() to avoid having to loop itself.)
1044
1045 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
1046 to change from the saved 'seqno' (it's possible that it's already
1047 changed, in which case the client should not wait at all), then start
1048 over from step 1. Only a call to Idl.run() will change the return value
1049 of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)"""
1050
8cdf0349 1051 # Status values that Transaction.commit() can return.
eda26d40
RB
1052
1053 # Not yet committed or aborted.
1054 UNCOMMITTED = "uncommitted"
1055 # Transaction didn't include any changes.
1056 UNCHANGED = "unchanged"
1057 # Commit in progress, please wait.
1058 INCOMPLETE = "incomplete"
1059 # ovsdb_idl_txn_abort() called.
1060 ABORTED = "aborted"
1061 # Commit successful.
1062 SUCCESS = "success"
1063 # Commit failed because a "verify" operation
1064 # reported an inconsistency, due to a network
1065 # problem, or other transient failure. Wait
1066 # for a change, then try again.
1067 TRY_AGAIN = "try again"
1068 # Server hasn't given us the lock yet.
1069 NOT_LOCKED = "not locked"
1070 # Commit failed due to a hard error.
1071 ERROR = "error"
8cdf0349
BP
1072
1073 @staticmethod
1074 def status_to_string(status):
1075 """Converts one of the status values that Transaction.commit() can
1076 return into a human-readable string.
1077
1078 (The status values are in fact such strings already, so
1079 there's nothing to do.)"""
1080 return status
1081
1082 def __init__(self, idl):
1083 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
1084 A given Idl may only have a single active transaction at a time.
1085
1086 A Transaction may modify the contents of a database by assigning new
1087 values to columns (attributes of Row), deleting rows (with
1088 Row.delete()), or inserting rows (with Transaction.insert()). It may
1089 also check that columns in the database have not changed with
1090 Row.verify().
1091
1092 When a transaction is complete (which must be before the next call to
1093 Idl.run()), call Transaction.commit() or Transaction.abort()."""
1094 assert idl.txn is None
1095
1096 idl.txn = self
1097 self._request_id = None
1098 self.idl = idl
1099 self.dry_run = False
1100 self._txn_rows = {}
1101 self._status = Transaction.UNCOMMITTED
1102 self._error = None
1103 self._comments = []
1104
94fbe1aa 1105 self._inc_row = None
8cdf0349 1106 self._inc_column = None
8cdf0349 1107
80c12152
SA
1108 self._fetch_requests = []
1109
26bb0f31 1110 self._inserted_rows = {} # Map from UUID to _InsertedRow
8cdf0349
BP
1111
1112 def add_comment(self, comment):
80c12152 1113 """Appends 'comment' to the comments that will be passed to the OVSDB
8cdf0349
BP
1114 server when this transaction is committed. (The comment will be
1115 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
1116 relatively human-readable form.)"""
1117 self._comments.append(comment)
1118
8cdf0349 1119 def wait(self, poller):
2f926787
BP
1120 """Causes poll_block() to wake up if this transaction has completed
1121 committing."""
8cdf0349
BP
1122 if self._status not in (Transaction.UNCOMMITTED,
1123 Transaction.INCOMPLETE):
1124 poller.immediate_wake()
1125
1126 def _substitute_uuids(self, json):
da2d45c6 1127 if isinstance(json, (list, tuple)):
8cdf0349 1128 if (len(json) == 2
897c8064
LS
1129 and json[0] == 'uuid'
1130 and ovs.ovsuuid.is_valid_string(json[1])):
8cdf0349
BP
1131 uuid = ovs.ovsuuid.from_string(json[1])
1132 row = self._txn_rows.get(uuid, None)
1133 if row and row._data is None:
1134 return ["named-uuid", _uuid_name_from_uuid(uuid)]
225b582a
IY
1135 else:
1136 return [self._substitute_uuids(elem) for elem in json]
8cdf0349
BP
1137 return json
1138
1139 def __disassemble(self):
1140 self.idl.txn = None
1141
cb96c1b2 1142 for row in six.itervalues(self._txn_rows):
8cdf0349
BP
1143 if row._changes is None:
1144 row._table.rows[row.uuid] = row
1145 elif row._data is None:
1146 del row._table.rows[row.uuid]
1147 row.__dict__["_changes"] = {}
a59912a0 1148 row.__dict__["_mutations"] = {}
8cdf0349
BP
1149 row.__dict__["_prereqs"] = {}
1150 self._txn_rows = {}
1151
1152 def commit(self):
2f926787
BP
1153 """Attempts to commit 'txn'. Returns the status of the commit
1154 operation, one of the following constants:
1155
1156 Transaction.INCOMPLETE:
1157
1158 The transaction is in progress, but not yet complete. The caller
1159 should call again later, after calling Idl.run() to let the
1160 IDL do OVSDB protocol processing.
1161
1162 Transaction.UNCHANGED:
1163
1164 The transaction is complete. (It didn't actually change the
1165 database, so the IDL didn't send any request to the database
1166 server.)
1167
1168 Transaction.ABORTED:
1169
1170 The caller previously called Transaction.abort().
1171
1172 Transaction.SUCCESS:
1173
1174 The transaction was successful. The update made by the
1175 transaction (and possibly other changes made by other database
1176 clients) should already be visible in the IDL.
1177
1178 Transaction.TRY_AGAIN:
1179
1180 The transaction failed for some transient reason, e.g. because a
1181 "verify" operation reported an inconsistency or due to a network
1182 problem. The caller should wait for a change to the database,
1183 then compose a new transaction, and commit the new transaction.
1184
1185 Use Idl.change_seqno to wait for a change in the database. It is
1186 important to use its value *before* the initial call to
1187 Transaction.commit() as the baseline for this purpose, because
1188 the change that one should wait for can happen after the initial
1189 call but before the call that returns Transaction.TRY_AGAIN, and
1190 using some other baseline value in that situation could cause an
1191 indefinite wait if the database rarely changes.
1192
1193 Transaction.NOT_LOCKED:
1194
1195 The transaction failed because the IDL has been configured to
1196 require a database lock (with Idl.set_lock()) but didn't
1197 get it yet or has already lost it.
8cdf0349
BP
1198
1199 Committing a transaction rolls back all of the changes that it made to
2f926787 1200 the IDL's copy of the database. If the transaction commits
8cdf0349 1201 successfully, then the database server will send an update and, thus,
2f926787 1202 the IDL will be updated with the committed changes."""
8cdf0349
BP
1203 # The status can only change if we're the active transaction.
1204 # (Otherwise, our status will change only in Idl.run().)
1205 if self != self.idl.txn:
1206 return self._status
1207
1208 # If we need a lock but don't have it, give up quickly.
9614403d 1209 if self.idl.lock_name and not self.idl.has_lock:
8cdf0349
BP
1210 self._status = Transaction.NOT_LOCKED
1211 self.__disassemble()
1212 return self._status
1213
1214 operations = [self.idl._db.name]
1215
1216 # Assert that we have the required lock (avoiding a race).
1217 if self.idl.lock_name:
1218 operations.append({"op": "assert",
1219 "lock": self.idl.lock_name})
1220
1221 # Add prerequisites and declarations of new rows.
cb96c1b2 1222 for row in six.itervalues(self._txn_rows):
8cdf0349
BP
1223 if row._prereqs:
1224 rows = {}
1225 columns = []
1226 for column_name in row._prereqs:
1227 columns.append(column_name)
1228 rows[column_name] = row._data[column_name].to_json()
1229 operations.append({"op": "wait",
1230 "table": row._table.name,
1231 "timeout": 0,
1232 "where": _where_uuid_equals(row.uuid),
1233 "until": "==",
1234 "columns": columns,
1235 "rows": [rows]})
1236
1237 # Add updates.
1238 any_updates = False
cb96c1b2 1239 for row in six.itervalues(self._txn_rows):
8cdf0349
BP
1240 if row._changes is None:
1241 if row._table.is_root:
1242 operations.append({"op": "delete",
1243 "table": row._table.name,
1244 "where": _where_uuid_equals(row.uuid)})
1245 any_updates = True
1246 else:
1247 # Let ovsdb-server decide whether to really delete it.
1248 pass
1249 elif row._changes:
1250 op = {"table": row._table.name}
1251 if row._data is None:
1252 op["op"] = "insert"
1253 op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
1254 any_updates = True
1255
1256 op_index = len(operations) - 1
1257 self._inserted_rows[row.uuid] = _InsertedRow(op_index)
1258 else:
1259 op["op"] = "update"
1260 op["where"] = _where_uuid_equals(row.uuid)
1261
1262 row_json = {}
1263 op["row"] = row_json
1264
cb96c1b2 1265 for column_name, datum in six.iteritems(row._changes):
8cdf0349 1266 if row._data is not None or not datum.is_default():
26bb0f31 1267 row_json[column_name] = (
897c8064 1268 self._substitute_uuids(datum.to_json()))
8cdf0349
BP
1269
1270 # If anything really changed, consider it an update.
1271 # We can't suppress not-really-changed values earlier
1272 # or transactions would become nonatomic (see the big
1273 # comment inside Transaction._write()).
1274 if (not any_updates and row._data is not None and
897c8064 1275 row._data[column_name] != datum):
8cdf0349
BP
1276 any_updates = True
1277
1278 if row._data is None or row_json:
1279 operations.append(op)
a59912a0
RM
1280 if row._mutations:
1281 addop = False
1282 op = {"table": row._table.name}
1283 op["op"] = "mutate"
b3220c67
AB
1284 if row._data is None:
1285 # New row
1286 op["where"] = self._substitute_uuids(
1287 _where_uuid_equals(row.uuid))
1288 else:
1289 # Existing row
1290 op["where"] = _where_uuid_equals(row.uuid)
a59912a0
RM
1291 op["mutations"] = []
1292 if '_removes' in row._mutations.keys():
1293 for col, dat in six.iteritems(row._mutations['_removes']):
1294 column = row._table.columns[col]
1295 if column.type.is_map():
1296 opdat = ["set"]
330b9c9c 1297 opdat.append(list(dat))
a59912a0
RM
1298 else:
1299 opdat = ["set"]
1300 inner_opdat = []
1301 for ele in dat:
1302 try:
1303 datum = data.Datum.from_python(column.type,
1304 ele, _row_to_uuid)
1305 except error.Error:
1306 return
1307 inner_opdat.append(
1308 self._substitute_uuids(datum.to_json()))
1309 opdat.append(inner_opdat)
1310 mutation = [col, "delete", opdat]
1311 op["mutations"].append(mutation)
1312 addop = True
1313 if '_inserts' in row._mutations.keys():
330b9c9c 1314 for col, val in six.iteritems(row._mutations['_inserts']):
a59912a0
RM
1315 column = row._table.columns[col]
1316 if column.type.is_map():
1317 opdat = ["map"]
330b9c9c
AB
1318 datum = data.Datum.from_python(column.type, val,
1319 _row_to_uuid)
1320 opdat.append(datum.as_list())
a59912a0
RM
1321 else:
1322 opdat = ["set"]
1323 inner_opdat = []
330b9c9c 1324 for ele in val:
a59912a0
RM
1325 try:
1326 datum = data.Datum.from_python(column.type,
1327 ele, _row_to_uuid)
1328 except error.Error:
1329 return
1330 inner_opdat.append(
1331 self._substitute_uuids(datum.to_json()))
1332 opdat.append(inner_opdat)
1333 mutation = [col, "insert", opdat]
1334 op["mutations"].append(mutation)
1335 addop = True
1336 if addop:
1337 operations.append(op)
1338 any_updates = True
8cdf0349 1339
80c12152
SA
1340 if self._fetch_requests:
1341 for fetch in self._fetch_requests:
1342 fetch["index"] = len(operations) - 1
1343 operations.append({"op": "select",
1344 "table": fetch["row"]._table.name,
1345 "where": self._substitute_uuids(
1346 _where_uuid_equals(fetch["row"].uuid)),
1347 "columns": [fetch["column_name"]]})
1348 any_updates = True
1349
8cdf0349 1350 # Add increment.
94fbe1aa 1351 if self._inc_row and any_updates:
8cdf0349
BP
1352 self._inc_index = len(operations) - 1
1353
1354 operations.append({"op": "mutate",
94fbe1aa 1355 "table": self._inc_row._table.name,
26bb0f31 1356 "where": self._substitute_uuids(
94fbe1aa 1357 _where_uuid_equals(self._inc_row.uuid)),
8cdf0349
BP
1358 "mutations": [[self._inc_column, "+=", 1]]})
1359 operations.append({"op": "select",
94fbe1aa 1360 "table": self._inc_row._table.name,
26bb0f31 1361 "where": self._substitute_uuids(
94fbe1aa 1362 _where_uuid_equals(self._inc_row.uuid)),
8cdf0349
BP
1363 "columns": [self._inc_column]})
1364
1365 # Add comment.
1366 if self._comments:
1367 operations.append({"op": "comment",
1368 "comment": "\n".join(self._comments)})
1369
1370 # Dry run?
1371 if self.dry_run:
1372 operations.append({"op": "abort"})
1373
1374 if not any_updates:
1375 self._status = Transaction.UNCHANGED
1376 else:
1377 msg = ovs.jsonrpc.Message.create_request("transact", operations)
1378 self._request_id = msg.id
1379 if not self.idl._session.send(msg):
1380 self.idl._outstanding_txns[self._request_id] = self
1381 self._status = Transaction.INCOMPLETE
1382 else:
854a94d9 1383 self._status = Transaction.TRY_AGAIN
8cdf0349
BP
1384
1385 self.__disassemble()
1386 return self._status
1387
1388 def commit_block(self):
2f926787
BP
1389 """Attempts to commit this transaction, blocking until the commit
1390 either succeeds or fails. Returns the final commit status, which may
1391 be any Transaction.* value other than Transaction.INCOMPLETE.
1392
1393 This function calls Idl.run() on this transaction'ss IDL, so it may
1394 cause Idl.change_seqno to change."""
8cdf0349
BP
1395 while True:
1396 status = self.commit()
1397 if status != Transaction.INCOMPLETE:
1398 return status
1399
1400 self.idl.run()
1401
1402 poller = ovs.poller.Poller()
1403 self.idl.wait(poller)
1404 self.wait(poller)
1405 poller.block()
1406
1407 def get_increment_new_value(self):
2f926787
BP
1408 """Returns the final (incremented) value of the column in this
1409 transaction that was set to be incremented by Row.increment. This
1410 transaction must have committed successfully."""
8cdf0349
BP
1411 assert self._status == Transaction.SUCCESS
1412 return self._inc_new_value
1413
1414 def abort(self):
1415 """Aborts this transaction. If Transaction.commit() has already been
1416 called then the transaction might get committed anyhow."""
1417 self.__disassemble()
1418 if self._status in (Transaction.UNCOMMITTED,
1419 Transaction.INCOMPLETE):
1420 self._status = Transaction.ABORTED
1421
1422 def get_error(self):
1423 """Returns a string representing this transaction's current status,
1424 suitable for use in log messages."""
1425 if self._status != Transaction.ERROR:
1426 return Transaction.status_to_string(self._status)
1427 elif self._error:
1428 return self._error
1429 else:
1430 return "no error details available"
1431
1432 def __set_error_json(self, json):
1433 if self._error is None:
1434 self._error = ovs.json.to_string(json)
1435
1436 def get_insert_uuid(self, uuid):
1437 """Finds and returns the permanent UUID that the database assigned to a
1438 newly inserted row, given the UUID that Transaction.insert() assigned
1439 locally to that row.
1440
1441 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1442 or if it was assigned by that function and then deleted by Row.delete()
1443 within the same transaction. (Rows that are inserted and then deleted
1444 within a single transaction are never sent to the database server, so
1445 it never assigns them a permanent UUID.)
1446
1447 This transaction must have completed successfully."""
1448 assert self._status in (Transaction.SUCCESS,
1449 Transaction.UNCHANGED)
1450 inserted_row = self._inserted_rows.get(uuid)
1451 if inserted_row:
1452 return inserted_row.real
1453 return None
1454
94fbe1aa
BP
1455 def _increment(self, row, column):
1456 assert not self._inc_row
1457 self._inc_row = row
1458 self._inc_column = column
1459
80c12152 1460 def _fetch(self, row, column_name):
a0631d92 1461 self._fetch_requests.append({"row": row, "column_name": column_name})
80c12152 1462
8cdf0349
BP
1463 def _write(self, row, column, datum):
1464 assert row._changes is not None
a59912a0 1465 assert row._mutations is not None
8cdf0349
BP
1466
1467 txn = row._idl.txn
1468
1469 # If this is a write-only column and the datum being written is the
1470 # same as the one already there, just skip the update entirely. This
1471 # is worth optimizing because we have a lot of columns that get
1472 # periodically refreshed into the database but don't actually change
1473 # that often.
1474 #
1475 # We don't do this for read/write columns because that would break
1476 # atomicity of transactions--some other client might have written a
1477 # different value in that column since we read it. (But if a whole
1478 # transaction only does writes of existing values, without making any
1479 # real changes, we will drop the whole transaction later in
1480 # ovsdb_idl_txn_commit().)
37520ab3
RB
1481 if (not column.alert and row._data and
1482 row._data.get(column.name) == datum):
8cdf0349
BP
1483 new_value = row._changes.get(column.name)
1484 if new_value is None or new_value == datum:
1485 return
1486
1487 txn._txn_rows[row.uuid] = row
330b9c9c
AB
1488 if '_inserts' in row._mutations:
1489 row._mutations['_inserts'].pop(column.name, None)
1490 if '_removes' in row._mutations:
1491 row._mutations['_removes'].pop(column.name, None)
1492 row._changes[column.name] = datum.copy()
8cdf0349
BP
1493
1494 def insert(self, table, new_uuid=None):
1495 """Inserts and returns a new row in 'table', which must be one of the
1496 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1497
1498 The new row is assigned a provisional UUID. If 'uuid' is None then one
1499 is randomly generated; otherwise 'uuid' should specify a randomly
1500 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
1501 different UUID when 'txn' is committed, but the IDL will replace any
1502 uses of the provisional UUID in the data to be to be committed by the
1503 UUID assigned by ovsdb-server."""
1504 assert self._status == Transaction.UNCOMMITTED
1505 if new_uuid is None:
1506 new_uuid = uuid.uuid4()
1507 row = Row(self.idl, table, new_uuid, None)
1508 table.rows[row.uuid] = row
1509 self._txn_rows[row.uuid] = row
1510 return row
1511
1512 def _process_reply(self, msg):
1513 if msg.type == ovs.jsonrpc.Message.T_ERROR:
1514 self._status = Transaction.ERROR
da2d45c6 1515 elif not isinstance(msg.result, (list, tuple)):
8cdf0349 1516 # XXX rate-limit
3a656eaf 1517 vlog.warn('reply to "transact" is not JSON array')
8cdf0349
BP
1518 else:
1519 hard_errors = False
1520 soft_errors = False
1521 lock_errors = False
1522
1523 ops = msg.result
1524 for op in ops:
1525 if op is None:
1526 # This isn't an error in itself but indicates that some
1527 # prior operation failed, so make sure that we know about
1528 # it.
1529 soft_errors = True
da2d45c6 1530 elif isinstance(op, dict):
8cdf0349
BP
1531 error = op.get("error")
1532 if error is not None:
1533 if error == "timed out":
1534 soft_errors = True
1535 elif error == "not owner":
1536 lock_errors = True
1537 elif error == "aborted":
1538 pass
1539 else:
1540 hard_errors = True
1541 self.__set_error_json(op)
1542 else:
1543 hard_errors = True
1544 self.__set_error_json(op)
1545 # XXX rate-limit
3a656eaf 1546 vlog.warn("operation reply is not JSON null or object")
8cdf0349
BP
1547
1548 if not soft_errors and not hard_errors and not lock_errors:
94fbe1aa 1549 if self._inc_row and not self.__process_inc_reply(ops):
8cdf0349 1550 hard_errors = True
80c12152
SA
1551 if self._fetch_requests:
1552 if self.__process_fetch_reply(ops):
1553 self.idl.change_seqno += 1
1554 else:
1555 hard_errors = True
8cdf0349 1556
cb96c1b2 1557 for insert in six.itervalues(self._inserted_rows):
8cdf0349
BP
1558 if not self.__process_insert_reply(insert, ops):
1559 hard_errors = True
1560
1561 if hard_errors:
1562 self._status = Transaction.ERROR
1563 elif lock_errors:
1564 self._status = Transaction.NOT_LOCKED
1565 elif soft_errors:
854a94d9 1566 self._status = Transaction.TRY_AGAIN
8cdf0349
BP
1567 else:
1568 self._status = Transaction.SUCCESS
1569
1570 @staticmethod
1571 def __check_json_type(json, types, name):
1572 if not json:
1573 # XXX rate-limit
3a656eaf 1574 vlog.warn("%s is missing" % name)
8cdf0349 1575 return False
da2d45c6 1576 elif not isinstance(json, tuple(types)):
8cdf0349 1577 # XXX rate-limit
3a656eaf 1578 vlog.warn("%s has unexpected type %s" % (name, type(json)))
8cdf0349
BP
1579 return False
1580 else:
1581 return True
1582
80c12152
SA
1583 def __process_fetch_reply(self, ops):
1584 update = False
1585 for fetch_request in self._fetch_requests:
1586 row = fetch_request["row"]
1587 column_name = fetch_request["column_name"]
1588 index = fetch_request["index"]
1589 table = row._table
1590
1591 select = ops[index]
1592 fetched_rows = select.get("rows")
1593 if not Transaction.__check_json_type(fetched_rows, (list, tuple),
1594 '"select" reply "rows"'):
1595 return False
1596 if len(fetched_rows) != 1:
1597 # XXX rate-limit
1598 vlog.warn('"select" reply "rows" has %d elements '
e8049bc5 1599 'instead of 1' % len(fetched_rows))
80c12152
SA
1600 continue
1601 fetched_row = fetched_rows[0]
1602 if not Transaction.__check_json_type(fetched_row, (dict,),
1603 '"select" reply row'):
1604 continue
1605
1606 column = table.columns.get(column_name)
1607 datum_json = fetched_row.get(column_name)
a59912a0 1608 datum = data.Datum.from_json(column.type, datum_json)
80c12152
SA
1609
1610 row._data[column_name] = datum
1611 update = True
1612
1613 return update
1614
8cdf0349
BP
1615 def __process_inc_reply(self, ops):
1616 if self._inc_index + 2 > len(ops):
1617 # XXX rate-limit
3a656eaf
EJ
1618 vlog.warn("reply does not contain enough operations for "
1619 "increment (has %d, needs %d)" %
1620 (len(ops), self._inc_index + 2))
8cdf0349
BP
1621
1622 # We know that this is a JSON object because the loop in
1623 # __process_reply() already checked.
1624 mutate = ops[self._inc_index]
1625 count = mutate.get("count")
8f808842 1626 if not Transaction.__check_json_type(count, six.integer_types,
8cdf0349
BP
1627 '"mutate" reply "count"'):
1628 return False
1629 if count != 1:
1630 # XXX rate-limit
3a656eaf 1631 vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
8cdf0349
BP
1632 return False
1633
1634 select = ops[self._inc_index + 1]
1635 rows = select.get("rows")
1636 if not Transaction.__check_json_type(rows, (list, tuple),
1637 '"select" reply "rows"'):
1638 return False
1639 if len(rows) != 1:
1640 # XXX rate-limit
3a656eaf
EJ
1641 vlog.warn('"select" reply "rows" has %d elements '
1642 'instead of 1' % len(rows))
8cdf0349
BP
1643 return False
1644 row = rows[0]
1645 if not Transaction.__check_json_type(row, (dict,),
1646 '"select" reply row'):
1647 return False
1648 column = row.get(self._inc_column)
8f808842 1649 if not Transaction.__check_json_type(column, six.integer_types,
8cdf0349
BP
1650 '"select" reply inc column'):
1651 return False
1652 self._inc_new_value = column
1653 return True
1654
1655 def __process_insert_reply(self, insert, ops):
1656 if insert.op_index >= len(ops):
1657 # XXX rate-limit
3a656eaf
EJ
1658 vlog.warn("reply does not contain enough operations "
1659 "for insert (has %d, needs %d)"
1660 % (len(ops), insert.op_index))
8cdf0349
BP
1661 return False
1662
1663 # We know that this is a JSON object because the loop in
1664 # __process_reply() already checked.
1665 reply = ops[insert.op_index]
1666 json_uuid = reply.get("uuid")
1667 if not Transaction.__check_json_type(json_uuid, (tuple, list),
1668 '"insert" reply "uuid"'):
1669 return False
1670
1671 try:
1672 uuid_ = ovs.ovsuuid.from_json(json_uuid)
1673 except error.Error:
1674 # XXX rate-limit
3a656eaf 1675 vlog.warn('"insert" reply "uuid" is not a JSON UUID')
8cdf0349 1676 return False
bf6ec045 1677
8cdf0349
BP
1678 insert.real = uuid_
1679 return True
ad0991e6
EJ
1680
1681
1682class SchemaHelper(object):
1683 """IDL Schema helper.
1684
1685 This class encapsulates the logic required to generate schemas suitable
1686 for creating 'ovs.db.idl.Idl' objects. Clients should register columns
1687 they are interested in using register_columns(). When finished, the
1688 get_idl_schema() function may be called.
1689
1690 The location on disk of the schema used may be found in the
1691 'schema_location' variable."""
1692
e15ad8e6
IY
1693 def __init__(self, location=None, schema_json=None):
1694 """Creates a new Schema object.
ad0991e6 1695
e15ad8e6
IY
1696 'location' file path to ovs schema. None means default location
1697 'schema_json' schema in json preresentation in memory
1698 """
1699
1700 if location and schema_json:
1701 raise ValueError("both location and schema_json can't be "
1702 "specified. it's ambiguous.")
1703 if schema_json is None:
1704 if location is None:
1705 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1706 schema_json = ovs.json.from_file(location)
ad0991e6 1707
e15ad8e6 1708 self.schema_json = schema_json
ad0991e6 1709 self._tables = {}
80c12152 1710 self._readonly = {}
bf42f674 1711 self._all = False
ad0991e6 1712
80c12152 1713 def register_columns(self, table, columns, readonly=[]):
ad0991e6
EJ
1714 """Registers interest in the given 'columns' of 'table'. Future calls
1715 to get_idl_schema() will include 'table':column for each column in
1716 'columns'. This function automatically avoids adding duplicate entries
1717 to the schema.
80c12152
SA
1718 A subset of 'columns' can be specified as 'readonly'. The readonly
1719 columns are not replicated but can be fetched on-demand by the user
1720 with Row.fetch().
ad0991e6
EJ
1721
1722 'table' must be a string.
1723 'columns' must be a list of strings.
80c12152 1724 'readonly' must be a list of strings.
ad0991e6
EJ
1725 """
1726
da2d45c6
RB
1727 assert isinstance(table, six.string_types)
1728 assert isinstance(columns, list)
ad0991e6
EJ
1729
1730 columns = set(columns) | self._tables.get(table, set())
1731 self._tables[table] = columns
80c12152 1732 self._readonly[table] = readonly
ad0991e6 1733
7698e31d
IY
1734 def register_table(self, table):
1735 """Registers interest in the given all columns of 'table'. Future calls
1736 to get_idl_schema() will include all columns of 'table'.
1737
1738 'table' must be a string
1739 """
da2d45c6 1740 assert isinstance(table, six.string_types)
7698e31d
IY
1741 self._tables[table] = set() # empty set means all columns in the table
1742
bf42f674
EJ
1743 def register_all(self):
1744 """Registers interest in every column of every table."""
1745 self._all = True
1746
ad0991e6
EJ
1747 def get_idl_schema(self):
1748 """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1749 object based on columns registered using the register_columns()
1750 function."""
1751
e15ad8e6
IY
1752 schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
1753 self.schema_json = None
ad0991e6 1754
bf42f674
EJ
1755 if not self._all:
1756 schema_tables = {}
cb96c1b2 1757 for table, columns in six.iteritems(self._tables):
bf42f674
EJ
1758 schema_tables[table] = (
1759 self._keep_table_columns(schema, table, columns))
1760
1761 schema.tables = schema_tables
80c12152 1762 schema.readonly = self._readonly
ad0991e6
EJ
1763 return schema
1764
1765 def _keep_table_columns(self, schema, table_name, columns):
1766 assert table_name in schema.tables
1767 table = schema.tables[table_name]
1768
7698e31d
IY
1769 if not columns:
1770 # empty set means all columns in the table
1771 return table
1772
ad0991e6
EJ
1773 new_columns = {}
1774 for column_name in columns:
da2d45c6 1775 assert isinstance(column_name, six.string_types)
ad0991e6
EJ
1776 assert column_name in table.columns
1777
1778 new_columns[column_name] = table.columns[column_name]
1779
1780 table.columns = new_columns
1781 return table