]> git.proxmox.com Git - mirror_ovs.git/blame - python/ovs/db/idl.py
cirrus: Use FreeBSD 12.2.
[mirror_ovs.git] / python / ovs / db / idl.py
CommitLineData
0164e367 1# Copyright (c) 2009, 2010, 2011, 2012, 2013, 2016 Nicira, Inc.
99155935
BP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at:
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
fbafc3c2 15import functools
8cdf0349 16import uuid
99155935 17
a59912a0 18import ovs.db.data as data
4c0f6271 19import ovs.db.parser
99155935 20import ovs.db.schema
6c7050b5 21import ovs.jsonrpc
99155935 22import ovs.ovsuuid
8cdf0349 23import ovs.poller
3a656eaf 24import ovs.vlog
13973bc4 25from ovs.db import custom_index
6c7050b5 26from ovs.db import error
27
3a656eaf 28vlog = ovs.vlog.Vlog("idl")
99155935 29
26bb0f31
EJ
30__pychecker__ = 'no-classattr no-objattrs'
31
d7d417fc
TW
32ROW_CREATE = "create"
33ROW_UPDATE = "update"
34ROW_DELETE = "delete"
26bb0f31 35
897c8064
LS
36OVSDB_UPDATE = 0
37OVSDB_UPDATE2 = 1
38
c39751e4
TE
39CLUSTERED = "clustered"
40
d7d417fc
TW
41
42class Idl(object):
99155935
BP
43 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
44
45 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
46 requests to an OVSDB database server and parses the responses, converting
47 raw JSON into data structures that are easier for clients to digest.
48
49 The IDL also assists with issuing database transactions. The client
50 creates a transaction, manipulates the IDL data structures, and commits or
51 aborts the transaction. The IDL then composes and issues the necessary
52 JSON-RPC requests and reports to the client whether the transaction
53 completed successfully.
54
8cdf0349
BP
55 The client is allowed to access the following attributes directly, in a
56 read-only fashion:
99155935 57
8cdf0349
BP
58 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
59 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
60 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
61 to a Row object.
62
63 The client may directly read and write the Row objects referenced by the
64 'rows' map values. Refer to Row for more details.
65
66 - 'change_seqno': A number that represents the IDL's state. When the IDL
2f926787
BP
67 is updated (by Idl.run()), its value changes. The sequence number can
68 occasionally change even if the database does not. This happens if the
69 connection to the database drops and reconnects, which causes the
70 database contents to be reloaded even if they didn't change. (It could
71 also happen if the database server sends out a "change" that reflects
72 what the IDL already thought was in the database. The database server is
73 not supposed to do that, but bugs could in theory cause it to do so.)
8cdf0349
BP
74
75 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
76 if no lock is configured.
77
78 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
79 lock, and False otherwise.
80
81 Locking and unlocking happens asynchronously from the database client's
82 point of view, so the information is only useful for optimization
83 (e.g. if the client doesn't have the lock then there's no point in trying
84 to write to the database).
85
86 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
87 the database server has indicated that some other client already owns the
88 requested lock, and False otherwise.
89
90 - 'txn': The ovs.db.idl.Transaction object for the database transaction
91 currently being constructed, if there is one, or None otherwise.
92"""
93
897c8064 94 IDL_S_INITIAL = 0
c39751e4
TE
95 IDL_S_SERVER_SCHEMA_REQUESTED = 1
96 IDL_S_SERVER_MONITOR_REQUESTED = 2
97 IDL_S_DATA_MONITOR_REQUESTED = 3
98 IDL_S_DATA_MONITOR_COND_REQUESTED = 4
897c8064 99
c39751e4
TE
100 def __init__(self, remote, schema_helper, probe_interval=None,
101 leader_only=True):
99155935
BP
102 """Creates and returns a connection to the database named 'db_name' on
103 'remote', which should be in a form acceptable to
104 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
8cdf0349
BP
105 replica of the remote database.
106
31e434fc
NS
107 'remote' can be comma separated multiple remotes and each remote
108 should be in a form acceptable to ovs.jsonrpc.session.open().
109
7a68987a
TA
110 'schema_helper' should be an instance of the SchemaHelper class which
111 generates schema for the remote database. The caller may have cut it
112 down by removing tables or columns that are not of interest. The IDL
113 will only replicate the tables and columns that remain. The caller may
114 also add an attribute named 'alert' to selected remaining columns,
115 setting its value to False; if so, then changes to those columns will
116 not be considered changes to the database for the purpose of the return
117 value of Idl.run() and Idl.change_seqno. This is useful for columns
118 that the IDL's client will write but not read.
8cdf0349 119
ad0991e6
EJ
120 As a convenience to users, 'schema' may also be an instance of the
121 SchemaHelper class.
122
f73d562f
LAG
123 The IDL uses and modifies 'schema' directly.
124
c39751e4
TE
125 If 'leader_only' is set to True (default value) the IDL will only
126 monitor and transact with the leader of the cluster.
127
f73d562f
LAG
128 If "probe_interval" is zero it disables the connection keepalive
129 feature. If non-zero the value will be forced to at least 1000
130 milliseconds. If None it will just use the default value in OVS.
131 """
8cdf0349 132
7a68987a
TA
133 assert isinstance(schema_helper, SchemaHelper)
134 schema = schema_helper.get_idl_schema()
ad0991e6 135
8cdf0349 136 self.tables = schema.tables
80c12152 137 self.readonly = schema.readonly
8cdf0349 138 self._db = schema
31e434fc
NS
139 remotes = self._parse_remotes(remote)
140 self._session = ovs.jsonrpc.Session.open_multiple(remotes,
f73d562f 141 probe_interval=probe_interval)
8cdf0349
BP
142 self._monitor_request_id = None
143 self._last_seqno = None
99155935 144 self.change_seqno = 0
897c8064 145 self.uuid = uuid.uuid1()
c39751e4
TE
146
147 # Server monitor.
148 self._server_schema_request_id = None
149 self._server_monitor_request_id = None
150 self._db_change_aware_request_id = None
151 self._server_db_name = '_Server'
152 self._server_db_table = 'Database'
153 self.server_tables = None
154 self._server_db = None
155 self.server_monitor_uuid = uuid.uuid1()
156 self.leader_only = leader_only
157 self.cluster_id = None
158 self._min_index = 0
159
897c8064 160 self.state = self.IDL_S_INITIAL
8cdf0349
BP
161
162 # Database locking.
163 self.lock_name = None # Name of lock we need, None if none.
164 self.has_lock = False # Has db server said we have the lock?
26bb0f31 165 self.is_lock_contended = False # Has db server said we can't get lock?
8cdf0349
BP
166 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
167
168 # Transaction support.
169 self.txn = None
170 self._outstanding_txns = {}
171
0c4d144a
TR
172 for table in schema.tables.values():
173 for column in table.columns.values():
8cdf0349
BP
174 if not hasattr(column, 'alert'):
175 column.alert = True
176 table.need_table = False
13973bc4 177 table.rows = custom_index.IndexedRows(table)
8cdf0349 178 table.idl = self
0164e367 179 table.condition = [True]
16ebb90e 180 table.cond_changed = False
99155935 181
31e434fc
NS
182 def _parse_remotes(self, remote):
183 # If remote is -
184 # "tcp:10.0.0.1:6641,unix:/tmp/db.sock,t,s,tcp:10.0.0.2:6642"
185 # this function returns
186 # ["tcp:10.0.0.1:6641", "unix:/tmp/db.sock,t,s", tcp:10.0.0.2:6642"]
187 remotes = []
188 for r in remote.split(','):
189 if remotes and r.find(":") == -1:
190 remotes[-1] += "," + r
191 else:
192 remotes.append(r)
193 return remotes
194
c39751e4
TE
195 def set_cluster_id(self, cluster_id):
196 """Set the id of the cluster that this idl must connect to."""
197 self.cluster_id = cluster_id
198 if self.state != self.IDL_S_INITIAL:
199 self.force_reconnect()
200
13973bc4
TW
201 def index_create(self, table, name):
202 """Create a named multi-column index on a table"""
203 return self.tables[table].rows.index_create(name)
204
205 def index_irange(self, table, name, start, end):
206 """Return items in a named index between start/end inclusive"""
207 return self.tables[table].rows.indexes[name].irange(start, end)
208
209 def index_equal(self, table, name, value):
210 """Return items in a named index matching a value"""
211 return self.tables[table].rows.indexes[name].irange(value, value)
212
99155935 213 def close(self):
8cdf0349
BP
214 """Closes the connection to the database. The IDL will no longer
215 update."""
216 self._session.close()
99155935
BP
217
218 def run(self):
219 """Processes a batch of messages from the database server. Returns
220 True if the database as seen through the IDL changed, False if it did
221 not change. The initial fetch of the entire contents of the remote
8cdf0349
BP
222 database is considered to be one kind of change. If the IDL has been
223 configured to acquire a database lock (with Idl.set_lock()), then
224 successfully acquiring the lock is also considered to be a change.
99155935
BP
225
226 This function can return occasional false positives, that is, report
227 that the database changed even though it didn't. This happens if the
228 connection to the database drops and reconnects, which causes the
229 database contents to be reloaded even if they didn't change. (It could
230 also happen if the database server sends out a "change" that reflects
231 what we already thought was in the database, but the database server is
232 not supposed to do that.)
233
234 As an alternative to checking the return value, the client may check
8cdf0349
BP
235 for changes in self.change_seqno."""
236 assert not self.txn
99155935 237 initial_change_seqno = self.change_seqno
16ebb90e
LS
238
239 self.send_cond_change()
8cdf0349
BP
240 self._session.run()
241 i = 0
242 while i < 50:
243 i += 1
244 if not self._session.is_connected():
245 break
246
247 seqno = self._session.get_seqno()
248 if seqno != self._last_seqno:
249 self._last_seqno = seqno
250 self.__txn_abort_all()
c39751e4 251 self.__send_server_schema_request()
8cdf0349
BP
252 if self.lock_name:
253 self.__send_lock_request()
254 break
255
256 msg = self._session.recv()
257 if msg is None:
258 break
c39751e4 259
8cdf0349 260 if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
897c8064
LS
261 and msg.method == "update2"
262 and len(msg.params) == 2):
263 # Database contents changed.
264 self.__parse_update(msg.params[1], OVSDB_UPDATE2)
265 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
266 and msg.method == "update"
267 and len(msg.params) == 2):
8cdf0349 268 # Database contents changed.
c39751e4
TE
269 if msg.params[0] == str(self.server_monitor_uuid):
270 self.__parse_update(msg.params[1], OVSDB_UPDATE,
271 tables=self.server_tables)
272 self.change_seqno = initial_change_seqno
273 if not self.__check_server_db():
274 self.force_reconnect()
275 break
276 else:
277 self.__parse_update(msg.params[1], OVSDB_UPDATE)
8cdf0349
BP
278 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
279 and self._monitor_request_id is not None
280 and self._monitor_request_id == msg.id):
281 # Reply to our "monitor" request.
282 try:
283 self.change_seqno += 1
284 self._monitor_request_id = None
285 self.__clear()
c39751e4 286 if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
897c8064
LS
287 self.__parse_update(msg.result, OVSDB_UPDATE2)
288 else:
c39751e4 289 assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
897c8064
LS
290 self.__parse_update(msg.result, OVSDB_UPDATE)
291
3ab76c56 292 except error.Error as e:
3a656eaf 293 vlog.err("%s: parse error in received schema: %s"
897c8064 294 % (self._session.get_name(), e))
8cdf0349 295 self.__error()
c39751e4
TE
296 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
297 and self._server_schema_request_id is not None
298 and self._server_schema_request_id == msg.id):
299 # Reply to our "get_schema" of _Server request.
300 try:
301 self._server_schema_request_id = None
302 sh = SchemaHelper(None, msg.result)
303 sh.register_table(self._server_db_table)
304 schema = sh.get_idl_schema()
305 self._server_db = schema
306 self.server_tables = schema.tables
307 self.__send_server_monitor_request()
308 except error.Error as e:
309 vlog.err("%s: error receiving server schema: %s"
310 % (self._session.get_name(), e))
311 if self.cluster_id:
312 self.__error()
313 break
314 else:
315 self.change_seqno = initial_change_seqno
316 self.__send_monitor_request()
317 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
318 and self._server_monitor_request_id is not None
319 and self._server_monitor_request_id == msg.id):
320 # Reply to our "monitor" of _Server request.
321 try:
322 self._server_monitor_request_id = None
323 self.__parse_update(msg.result, OVSDB_UPDATE,
324 tables=self.server_tables)
325 self.change_seqno = initial_change_seqno
326 if self.__check_server_db():
327 self.__send_monitor_request()
328 self.__send_db_change_aware()
329 else:
330 self.force_reconnect()
331 break
332 except error.Error as e:
333 vlog.err("%s: parse error in received schema: %s"
334 % (self._session.get_name(), e))
335 if self.cluster_id:
336 self.__error()
337 break
338 else:
339 self.change_seqno = initial_change_seqno
340 self.__send_monitor_request()
341 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
342 and self._db_change_aware_request_id is not None
343 and self._db_change_aware_request_id == msg.id):
344 # Reply to us notifying the server of our change awarness.
345 self._db_change_aware_request_id = None
8cdf0349
BP
346 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
347 and self._lock_request_id is not None
348 and self._lock_request_id == msg.id):
349 # Reply to our "lock" request.
350 self.__parse_lock_reply(msg.result)
351 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
352 and msg.method == "locked"):
353 # We got our lock.
354 self.__parse_lock_notify(msg.params, True)
355 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
356 and msg.method == "stolen"):
357 # Someone else stole our lock.
358 self.__parse_lock_notify(msg.params, False)
359 elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
360 # Reply to our echo request. Ignore it.
361 pass
897c8064 362 elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
c39751e4 363 self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
897c8064
LS
364 self._monitor_request_id == msg.id):
365 if msg.error == "unknown method":
366 self.__send_monitor_request()
c39751e4
TE
367 elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
368 self._server_schema_request_id is not None and
369 self._server_schema_request_id == msg.id):
370 self._server_schema_request_id = None
371 if self.cluster_id:
372 self.force_reconnect()
373 break
374 else:
375 self.change_seqno = initial_change_seqno
376 self.__send_monitor_request()
8cdf0349
BP
377 elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
378 ovs.jsonrpc.Message.T_REPLY)
379 and self.__txn_process_reply(msg)):
380 # __txn_process_reply() did everything needed.
381 pass
382 else:
383 # This can happen if a transaction is destroyed before we
384 # receive the reply, so keep the log level low.
3a656eaf
EJ
385 vlog.dbg("%s: received unexpected %s message"
386 % (self._session.get_name(),
387 ovs.jsonrpc.Message.type_to_string(msg.type)))
8cdf0349 388
99155935
BP
389 return initial_change_seqno != self.change_seqno
390
16ebb90e
LS
391 def send_cond_change(self):
392 if not self._session.is_connected():
393 return
394
0c4d144a 395 for table in self.tables.values():
16ebb90e
LS
396 if table.cond_changed:
397 self.__send_cond_change(table, table.condition)
398 table.cond_changed = False
399
0164e367
BP
400 def cond_change(self, table_name, cond):
401 """Sets the condition for 'table_name' to 'cond', which should be a
402 conditional expression suitable for use directly in the OVSDB
403 protocol, with the exception that the empty condition []
404 matches no rows (instead of matching every row). That is, []
405 is equivalent to [False], not to [True].
406 """
16ebb90e 407
897c8064
LS
408 table = self.tables.get(table_name)
409 if not table:
410 raise error.Error('Unknown table "%s"' % table_name)
16ebb90e 411
0164e367
BP
412 if cond == []:
413 cond = [False]
414 if table.condition != cond:
415 table.condition = cond
416 table.cond_changed = True
897c8064 417
99155935
BP
418 def wait(self, poller):
419 """Arranges for poller.block() to wake up when self.run() has something
420 to do or when activity occurs on a transaction on 'self'."""
8cdf0349
BP
421 self._session.wait(poller)
422 self._session.recv_wait(poller)
99155935 423
8cdf0349
BP
424 def has_ever_connected(self):
425 """Returns True, if the IDL successfully connected to the remote
426 database and retrieved its contents (even if the connection
427 subsequently dropped and is in the process of reconnecting). If so,
428 then the IDL contains an atomic snapshot of the database's contents
429 (but it might be arbitrarily old if the connection dropped).
430
431 Returns False if the IDL has never connected or retrieved the
432 database's contents. If so, the IDL is empty."""
433 return self.change_seqno != 0
434
435 def force_reconnect(self):
436 """Forces the IDL to drop its connection to the database and reconnect.
437 In the meantime, the contents of the IDL will not change."""
438 self._session.force_reconnect()
439
c39751e4
TE
440 def session_name(self):
441 return self._session.get_name()
442
8cdf0349
BP
443 def set_lock(self, lock_name):
444 """If 'lock_name' is not None, configures the IDL to obtain the named
445 lock from the database server and to avoid modifying the database when
446 the lock cannot be acquired (that is, when another client has the same
447 lock).
448
449 If 'lock_name' is None, drops the locking requirement and releases the
450 lock."""
451 assert not self.txn
452 assert not self._outstanding_txns
453
454 if self.lock_name and (not lock_name or lock_name != self.lock_name):
455 # Release previous lock.
456 self.__send_unlock_request()
457 self.lock_name = None
458 self.is_lock_contended = False
459
460 if lock_name and not self.lock_name:
461 # Acquire new lock.
462 self.lock_name = lock_name
463 self.__send_lock_request()
464
d7d417fc
TW
465 def notify(self, event, row, updates=None):
466 """Hook for implementing create/update/delete notifications
467
468 :param event: The event that was triggered
469 :type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE
470 :param row: The row as it is after the operation has occured
471 :type row: Row
a7261bf7
NS
472 :param updates: For updates, row with only old values of the changed
473 columns
d7d417fc
TW
474 :type updates: Row
475 """
476
897c8064
LS
477 def __send_cond_change(self, table, cond):
478 monitor_cond_change = {table.name: [{"where": cond}]}
479 old_uuid = str(self.uuid)
480 self.uuid = uuid.uuid1()
481 params = [old_uuid, str(self.uuid), monitor_cond_change]
482 msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
483 self._session.send(msg)
484
8cdf0349
BP
485 def __clear(self):
486 changed = False
487
0c4d144a 488 for table in self.tables.values():
8cdf0349
BP
489 if table.rows:
490 changed = True
13973bc4 491 table.rows = custom_index.IndexedRows(table)
99155935 492
8cdf0349
BP
493 if changed:
494 self.change_seqno += 1
495
496 def __update_has_lock(self, new_has_lock):
497 if new_has_lock and not self.has_lock:
498 if self._monitor_request_id is None:
499 self.change_seqno += 1
500 else:
501 # We're waiting for a monitor reply, so don't signal that the
502 # database changed. The monitor reply will increment
503 # change_seqno anyhow.
504 pass
505 self.is_lock_contended = False
506 self.has_lock = new_has_lock
507
508 def __do_send_lock_request(self, method):
509 self.__update_has_lock(False)
510 self._lock_request_id = None
511 if self._session.is_connected():
512 msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
513 msg_id = msg.id
514 self._session.send(msg)
515 else:
516 msg_id = None
517 return msg_id
518
519 def __send_lock_request(self):
520 self._lock_request_id = self.__do_send_lock_request("lock")
521
522 def __send_unlock_request(self):
523 self.__do_send_lock_request("unlock")
524
525 def __parse_lock_reply(self, result):
526 self._lock_request_id = None
da2d45c6 527 got_lock = isinstance(result, dict) and result.get("locked") is True
8cdf0349
BP
528 self.__update_has_lock(got_lock)
529 if not got_lock:
530 self.is_lock_contended = True
531
532 def __parse_lock_notify(self, params, new_has_lock):
533 if (self.lock_name is not None
da2d45c6 534 and isinstance(params, (list, tuple))
8cdf0349
BP
535 and params
536 and params[0] == self.lock_name):
e9ad9219 537 self.__update_has_lock(new_has_lock)
8cdf0349
BP
538 if not new_has_lock:
539 self.is_lock_contended = True
99155935 540
c39751e4
TE
541 def __send_db_change_aware(self):
542 msg = ovs.jsonrpc.Message.create_request("set_db_change_aware",
543 [True])
544 self._db_change_aware_request_id = msg.id
545 self._session.send(msg)
546
99155935 547 def __send_monitor_request(self):
c39751e4
TE
548 if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED,
549 self.IDL_S_INITIAL]):
550 self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
897c8064
LS
551 method = "monitor_cond"
552 else:
c39751e4 553 self.state = self.IDL_S_DATA_MONITOR_REQUESTED
897c8064
LS
554 method = "monitor"
555
99155935 556 monitor_requests = {}
0c4d144a 557 for table in self.tables.values():
80c12152 558 columns = []
0c4d144a 559 for column in table.columns.keys():
80c12152 560 if ((table.name not in self.readonly) or
897c8064
LS
561 (table.name in self.readonly) and
562 (column not in self.readonly[table.name])):
80c12152 563 columns.append(column)
62bba609 564 monitor_request = {"columns": columns}
0164e367 565 if method == "monitor_cond" and table.condition != [True]:
62bba609 566 monitor_request["where"] = table.condition
16ebb90e 567 table.cond_change = False
62bba609 568 monitor_requests[table.name] = [monitor_request]
897c8064 569
99155935 570 msg = ovs.jsonrpc.Message.create_request(
897c8064 571 method, [self._db.name, str(self.uuid), monitor_requests])
8cdf0349
BP
572 self._monitor_request_id = msg.id
573 self._session.send(msg)
99155935 574
c39751e4
TE
575 def __send_server_schema_request(self):
576 self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
577 msg = ovs.jsonrpc.Message.create_request(
578 "get_schema", [self._server_db_name, str(self.uuid)])
579 self._server_schema_request_id = msg.id
580 self._session.send(msg)
581
582 def __send_server_monitor_request(self):
583 self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
584 monitor_requests = {}
585 table = self.server_tables[self._server_db_table]
0c4d144a
TR
586 columns = [column for column in table.columns.keys()]
587 for column in table.columns.values():
c39751e4
TE
588 if not hasattr(column, 'alert'):
589 column.alert = True
590 table.rows = custom_index.IndexedRows(table)
591 table.need_table = False
592 table.idl = self
593 monitor_request = {"columns": columns}
594 monitor_requests[table.name] = [monitor_request]
595 msg = ovs.jsonrpc.Message.create_request(
596 'monitor', [self._server_db.name,
597 str(self.server_monitor_uuid),
598 monitor_requests])
599 self._server_monitor_request_id = msg.id
600 self._session.send(msg)
601
602 def __parse_update(self, update, version, tables=None):
99155935 603 try:
c39751e4
TE
604 if not tables:
605 self.__do_parse_update(update, version, self.tables)
606 else:
607 self.__do_parse_update(update, version, tables)
3ab76c56 608 except error.Error as e:
3a656eaf
EJ
609 vlog.err("%s: error parsing update: %s"
610 % (self._session.get_name(), e))
99155935 611
c39751e4 612 def __do_parse_update(self, table_updates, version, tables):
da2d45c6 613 if not isinstance(table_updates, dict):
99155935
BP
614 raise error.Error("<table-updates> is not an object",
615 table_updates)
616
0c4d144a 617 for table_name, table_update in table_updates.items():
c39751e4 618 table = tables.get(table_name)
99155935 619 if not table:
f2d8ad13
BP
620 raise error.Error('<table-updates> includes unknown '
621 'table "%s"' % table_name)
99155935 622
da2d45c6 623 if not isinstance(table_update, dict):
f2d8ad13
BP
624 raise error.Error('<table-update> for table "%s" is not '
625 'an object' % table_name, table_update)
99155935 626
0c4d144a 627 for uuid_string, row_update in table_update.items():
49c541dc 628 if not ovs.ovsuuid.is_valid_string(uuid_string):
f2d8ad13
BP
629 raise error.Error('<table-update> for table "%s" '
630 'contains bad UUID "%s" as member '
631 'name' % (table_name, uuid_string),
99155935 632 table_update)
49c541dc 633 uuid = ovs.ovsuuid.from_string(uuid_string)
99155935 634
da2d45c6 635 if not isinstance(row_update, dict):
f2d8ad13
BP
636 raise error.Error('<table-update> for table "%s" '
637 'contains <row-update> for %s that '
638 'is not an object'
99155935
BP
639 % (table_name, uuid_string))
640
897c8064
LS
641 if version == OVSDB_UPDATE2:
642 if self.__process_update2(table, uuid, row_update):
643 self.change_seqno += 1
644 continue
645
af1eba26 646 parser = ovs.db.parser.Parser(row_update, "row-update")
4c0f6271
BP
647 old = parser.get_optional("old", [dict])
648 new = parser.get_optional("new", [dict])
649 parser.finish()
99155935 650
99155935 651 if not old and not new:
f2d8ad13
BP
652 raise error.Error('<row-update> missing "old" and '
653 '"new" members', row_update)
99155935 654
8cdf0349 655 if self.__process_update(table, uuid, old, new):
99155935
BP
656 self.change_seqno += 1
657
897c8064
LS
658 def __process_update2(self, table, uuid, row_update):
659 row = table.rows.get(uuid)
660 changed = False
661 if "delete" in row_update:
662 if row:
663 del table.rows[uuid]
664 self.notify(ROW_DELETE, row)
665 changed = True
666 else:
667 # XXX rate-limit
668 vlog.warn("cannot delete missing row %s from table"
669 "%s" % (uuid, table.name))
670 elif "insert" in row_update or "initial" in row_update:
671 if row:
672 vlog.warn("cannot add existing row %s from table"
673 " %s" % (uuid, table.name))
674 del table.rows[uuid]
675 row = self.__create_row(table, uuid)
676 if "insert" in row_update:
677 row_update = row_update['insert']
678 else:
679 row_update = row_update['initial']
680 self.__add_default(table, row_update)
13973bc4
TW
681 changed = self.__row_update(table, row, row_update)
682 table.rows[uuid] = row
683 if changed:
897c8064
LS
684 self.notify(ROW_CREATE, row)
685 elif "modify" in row_update:
686 if not row:
687 raise error.Error('Modify non-existing row')
688
eab13876
DA
689 old_row = self.__apply_diff(table, row, row_update['modify'])
690 self.notify(ROW_UPDATE, row, Row(self, table, uuid, old_row))
897c8064
LS
691 changed = True
692 else:
693 raise error.Error('<row-update> unknown operation',
694 row_update)
695 return changed
696
8cdf0349 697 def __process_update(self, table, uuid, old, new):
99155935 698 """Returns True if a column changed, False otherwise."""
8cdf0349 699 row = table.rows.get(uuid)
7d48f8f8 700 changed = False
99155935
BP
701 if not new:
702 # Delete row.
703 if row:
8cdf0349 704 del table.rows[uuid]
7d48f8f8 705 changed = True
d7d417fc 706 self.notify(ROW_DELETE, row)
99155935
BP
707 else:
708 # XXX rate-limit
3a656eaf
EJ
709 vlog.warn("cannot delete missing row %s from table %s"
710 % (uuid, table.name))
99155935
BP
711 elif not old:
712 # Insert row.
13973bc4 713 op = ROW_CREATE
99155935
BP
714 if not row:
715 row = self.__create_row(table, uuid)
7d48f8f8 716 changed = True
99155935
BP
717 else:
718 # XXX rate-limit
13973bc4 719 op = ROW_UPDATE
3a656eaf
EJ
720 vlog.warn("cannot add existing row %s to table %s"
721 % (uuid, table.name))
13973bc4
TW
722 changed |= self.__row_update(table, row, new)
723 if op == ROW_CREATE:
724 table.rows[uuid] = row
725 if changed:
d7d417fc 726 self.notify(ROW_CREATE, row)
99155935 727 else:
d7d417fc 728 op = ROW_UPDATE
99155935
BP
729 if not row:
730 row = self.__create_row(table, uuid)
7d48f8f8 731 changed = True
d7d417fc 732 op = ROW_CREATE
99155935 733 # XXX rate-limit
3a656eaf
EJ
734 vlog.warn("cannot modify missing row %s in table %s"
735 % (uuid, table.name))
13973bc4
TW
736 changed |= self.__row_update(table, row, new)
737 if op == ROW_CREATE:
738 table.rows[uuid] = row
739 if changed:
d7d417fc 740 self.notify(op, row, Row.from_json(self, table, uuid, old))
7d48f8f8 741 return changed
99155935 742
c39751e4
TE
743 def __check_server_db(self):
744 """Returns True if this is a valid server database, False otherwise."""
745 session_name = self.session_name()
746
747 if self._server_db_table not in self.server_tables:
748 vlog.info("%s: server does not have %s table in its %s database"
749 % (session_name, self._server_db_table,
750 self._server_db_name))
751 return False
752
753 rows = self.server_tables[self._server_db_table].rows
754
755 database = None
0c4d144a 756 for row in rows.values():
c39751e4
TE
757 if self.cluster_id:
758 if self.cluster_id in \
759 map(lambda x: str(x)[:4], row.cid):
760 database = row
761 break
762 elif row.name == self._db.name:
763 database = row
764 break
765
766 if not database:
767 vlog.info("%s: server does not have %s database"
768 % (session_name, self._db.name))
769 return False
770
771 if (database.model == CLUSTERED and
772 self._session.get_num_of_remotes() > 1):
773 if not database.schema:
774 vlog.info('%s: clustered database server has not yet joined '
775 'cluster; trying another server' % session_name)
776 return False
777 if not database.connected:
778 vlog.info('%s: clustered database server is disconnected '
779 'from cluster; trying another server' % session_name)
780 return False
781 if (self.leader_only and
782 not database.leader):
783 vlog.info('%s: clustered database server is not cluster '
784 'leader; trying another server' % session_name)
785 return False
786 if database.index:
787 if database.index[0] < self._min_index:
788 vlog.warn('%s: clustered database server has stale data; '
789 'trying another server' % session_name)
790 return False
791 self._min_index = database.index[0]
792
793 return True
794
897c8064
LS
795 def __column_name(self, column):
796 if column.type.key.type == ovs.db.types.UuidType:
797 return ovs.ovsuuid.to_json(column.type.key.type.default)
798 else:
799 return column.type.key.type.default
800
801 def __add_default(self, table, row_update):
0c4d144a 802 for column in table.columns.values():
897c8064
LS
803 if column.name not in row_update:
804 if ((table.name not in self.readonly) or
805 (table.name in self.readonly) and
806 (column.name not in self.readonly[table.name])):
807 if column.type.n_min != 0 and not column.type.is_map():
808 row_update[column.name] = self.__column_name(column)
809
810 def __apply_diff(self, table, row, row_diff):
eab13876 811 old_row = {}
0c4d144a 812 for column_name, datum_diff_json in row_diff.items():
897c8064
LS
813 column = table.columns.get(column_name)
814 if not column:
815 # XXX rate-limit
816 vlog.warn("unknown column %s updating table %s"
817 % (column_name, table.name))
818 continue
819
820 try:
a59912a0 821 datum_diff = data.Datum.from_json(column.type, datum_diff_json)
897c8064
LS
822 except error.Error as e:
823 # XXX rate-limit
824 vlog.warn("error parsing column %s in table %s: %s"
825 % (column_name, table.name, e))
826 continue
827
eab13876 828 old_row[column_name] = row._data[column_name].copy()
a7261bf7 829 datum = row._data[column_name].diff(datum_diff)
897c8064
LS
830 if datum != row._data[column_name]:
831 row._data[column_name] = datum
832
eab13876 833 return old_row
a7261bf7 834
8cdf0349 835 def __row_update(self, table, row, row_json):
99155935 836 changed = False
0c4d144a 837 for column_name, datum_json in row_json.items():
99155935
BP
838 column = table.columns.get(column_name)
839 if not column:
840 # XXX rate-limit
3a656eaf
EJ
841 vlog.warn("unknown column %s updating table %s"
842 % (column_name, table.name))
99155935
BP
843 continue
844
845 try:
a59912a0 846 datum = data.Datum.from_json(column.type, datum_json)
3ab76c56 847 except error.Error as e:
99155935 848 # XXX rate-limit
3a656eaf
EJ
849 vlog.warn("error parsing column %s in table %s: %s"
850 % (column_name, table.name, e))
99155935
BP
851 continue
852
8cdf0349
BP
853 if datum != row._data[column_name]:
854 row._data[column_name] = datum
855 if column.alert:
856 changed = True
99155935
BP
857 else:
858 # Didn't really change but the OVSDB monitor protocol always
859 # includes every value in a row.
860 pass
861 return changed
862
99155935 863 def __create_row(self, table, uuid):
8cdf0349 864 data = {}
0c4d144a 865 for column in table.columns.values():
8cdf0349 866 data[column.name] = ovs.db.data.Datum.default(column.type)
13973bc4 867 return Row(self, table, uuid, data)
99155935 868
8cdf0349
BP
869 def __error(self):
870 self._session.force_reconnect()
871
872 def __txn_abort_all(self):
873 while self._outstanding_txns:
874 txn = self._outstanding_txns.popitem()[1]
854a94d9 875 txn._status = Transaction.TRY_AGAIN
8cdf0349
BP
876
877 def __txn_process_reply(self, msg):
878 txn = self._outstanding_txns.pop(msg.id, None)
879 if txn:
880 txn._process_reply(msg)
beba3d82 881 return True
8cdf0349 882
26bb0f31 883
8cdf0349
BP
884def _uuid_to_row(atom, base):
885 if base.ref_table:
886 return base.ref_table.rows.get(atom)
887 else:
888 return atom
889
26bb0f31 890
8cdf0349 891def _row_to_uuid(value):
da2d45c6 892 if isinstance(value, Row):
8cdf0349
BP
893 return value.uuid
894 else:
895 return value
bf6ec045 896
26bb0f31 897
fbafc3c2 898@functools.total_ordering
bf6ec045 899class Row(object):
8cdf0349
BP
900 """A row within an IDL.
901
902 The client may access the following attributes directly:
903
904 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
905
906 - An attribute for each column in the Row's table, named for the column,
907 whose values are as returned by Datum.to_python() for the column's type.
908
909 If some error occurs (e.g. the database server's idea of the column is
910 different from the IDL's idea), then the attribute values is the
911 "default" value return by Datum.default() for the column's type. (It is
912 important to know this because the default value may violate constraints
913 for the column's type, e.g. the default integer value is 0 even if column
914 contraints require the column's value to be positive.)
915
916 When a transaction is active, column attributes may also be assigned new
917 values. Committing the transaction will then cause the new value to be
918 stored into the database.
919
920 *NOTE*: In the current implementation, the value of a column is a *copy*
921 of the value in the database. This means that modifying its value
922 directly will have no useful effect. For example, the following:
923 row.mycolumn["a"] = "b" # don't do this
924 will not change anything in the database, even after commit. To modify
925 the column, instead assign the modified column value back to the column:
926 d = row.mycolumn
927 d["a"] = "b"
928 row.mycolumn = d
929"""
930 def __init__(self, idl, table, uuid, data):
931 # All of the explicit references to self.__dict__ below are required
932 # to set real attributes with invoking self.__getattr__().
933 self.__dict__["uuid"] = uuid
934
935 self.__dict__["_idl"] = idl
936 self.__dict__["_table"] = table
937
938 # _data is the committed data. It takes the following values:
939 #
940 # - A dictionary that maps every column name to a Datum, if the row
941 # exists in the committed form of the database.
942 #
943 # - None, if this row is newly inserted within the active transaction
944 # and thus has no committed form.
945 self.__dict__["_data"] = data
946
947 # _changes describes changes to this row within the active transaction.
948 # It takes the following values:
949 #
950 # - {}, the empty dictionary, if no transaction is active or if the
951 # row has yet not been changed within this transaction.
952 #
953 # - A dictionary that maps a column name to its new Datum, if an
954 # active transaction changes those columns' values.
955 #
956 # - A dictionary that maps every column name to a Datum, if the row
957 # is newly inserted within the active transaction.
958 #
959 # - None, if this transaction deletes this row.
960 self.__dict__["_changes"] = {}
961
a59912a0
RM
962 # _mutations describes changes to this row to be handled via a
963 # mutate operation on the wire. It takes the following values:
964 #
965 # - {}, the empty dictionary, if no transaction is active or if the
966 # row has yet not been mutated within this transaction.
967 #
968 # - A dictionary that contains two keys:
969 #
970 # - "_inserts" contains a dictionary that maps column names to
971 # new keys/key-value pairs that should be inserted into the
972 # column
973 # - "_removes" contains a dictionary that maps column names to
974 # the keys/key-value pairs that should be removed from the
975 # column
976 #
977 # - None, if this transaction deletes this row.
978 self.__dict__["_mutations"] = {}
979
8cdf0349
BP
980 # A dictionary whose keys are the names of columns that must be
981 # verified as prerequisites when the transaction commits. The values
982 # in the dictionary are all None.
983 self.__dict__["_prereqs"] = {}
984
fbafc3c2
RB
985 def __lt__(self, other):
986 if not isinstance(other, Row):
987 return NotImplemented
988 return bool(self.__dict__['uuid'] < other.__dict__['uuid'])
989
990 def __eq__(self, other):
991 if not isinstance(other, Row):
992 return NotImplemented
993 return bool(self.__dict__['uuid'] == other.__dict__['uuid'])
994
995 def __hash__(self):
996 return int(self.__dict__['uuid'])
997
6a1c9846
TW
998 def __str__(self):
999 return "{table}({data})".format(
1000 table=self._table.name,
1001 data=", ".join("{col}={val}".format(col=c, val=getattr(self, c))
1002 for c in sorted(self._table.columns)))
1003
8cdf0349
BP
1004 def __getattr__(self, column_name):
1005 assert self._changes is not None
a59912a0 1006 assert self._mutations is not None
8cdf0349 1007
685e6983
TR
1008 try:
1009 column = self._table.columns[column_name]
1010 except KeyError:
1011 raise AttributeError("%s instance has no attribute '%s'" %
1012 (self.__class__.__name__, column_name))
8cdf0349 1013 datum = self._changes.get(column_name)
a59912a0
RM
1014 inserts = None
1015 if '_inserts' in self._mutations.keys():
1016 inserts = self._mutations['_inserts'].get(column_name)
1017 removes = None
1018 if '_removes' in self._mutations.keys():
1019 removes = self._mutations['_removes'].get(column_name)
8cdf0349 1020 if datum is None:
3b4c362f 1021 if self._data is None:
a59912a0
RM
1022 if inserts is None:
1023 raise AttributeError("%s instance has no attribute '%s'" %
1024 (self.__class__.__name__,
1025 column_name))
1026 else:
2d54d801
AB
1027 datum = data.Datum.from_python(column.type,
1028 inserts,
1029 _row_to_uuid)
1030 elif column_name in self._data:
80c12152 1031 datum = self._data[column_name]
2d54d801
AB
1032 if column.type.is_set():
1033 dlist = datum.as_list()
a59912a0 1034 if inserts is not None:
2d54d801 1035 dlist.extend(list(inserts))
a59912a0 1036 if removes is not None:
2d54d801
AB
1037 removes_datum = data.Datum.from_python(column.type,
1038 removes,
1039 _row_to_uuid)
1040 removes_list = removes_datum.as_list()
1041 dlist = [x for x in dlist if x not in removes_list]
1042 datum = data.Datum.from_python(column.type, dlist,
1043 _row_to_uuid)
1044 elif column.type.is_map():
1045 dmap = datum.to_python(_uuid_to_row)
a59912a0 1046 if inserts is not None:
2d54d801 1047 dmap.update(inserts)
a59912a0 1048 if removes is not None:
2d54d801
AB
1049 for key in removes:
1050 if key not in (inserts or {}):
f192ba27 1051 dmap.pop(key, None)
2d54d801
AB
1052 datum = data.Datum.from_python(column.type, dmap,
1053 _row_to_uuid)
80c12152 1054 else:
a59912a0
RM
1055 if inserts is None:
1056 raise AttributeError("%s instance has no attribute '%s'" %
1057 (self.__class__.__name__,
1058 column_name))
1059 else:
1060 datum = inserts
8cdf0349
BP
1061
1062 return datum.to_python(_uuid_to_row)
1063
1064 def __setattr__(self, column_name, value):
1065 assert self._changes is not None
1066 assert self._idl.txn
1067
80c12152 1068 if ((self._table.name in self._idl.readonly) and
897c8064 1069 (column_name in self._idl.readonly[self._table.name])):
37520ab3
RB
1070 vlog.warn("attempting to write to readonly column %s"
1071 % column_name)
80c12152
SA
1072 return
1073
8cdf0349
BP
1074 column = self._table.columns[column_name]
1075 try:
a59912a0 1076 datum = data.Datum.from_python(column.type, value, _row_to_uuid)
3ab76c56 1077 except error.Error as e:
8cdf0349 1078 # XXX rate-limit
3a656eaf
EJ
1079 vlog.err("attempting to write bad value to column %s (%s)"
1080 % (column_name, e))
8cdf0349 1081 return
13973bc4
TW
1082 # Remove prior version of the Row from the index if it has the indexed
1083 # column set, and the column changing is an indexed column
1084 if hasattr(self, column_name):
1085 for idx in self._table.rows.indexes.values():
1086 if column_name in (c.column for c in idx.columns):
1087 idx.remove(self)
8cdf0349 1088 self._idl.txn._write(self, column, datum)
13973bc4
TW
1089 for idx in self._table.rows.indexes.values():
1090 # Only update the index if indexed columns change
1091 if column_name in (c.column for c in idx.columns):
1092 idx.add(self)
8cdf0349 1093
a59912a0
RM
1094 def addvalue(self, column_name, key):
1095 self._idl.txn._txn_rows[self.uuid] = self
330b9c9c
AB
1096 column = self._table.columns[column_name]
1097 try:
1098 data.Datum.from_python(column.type, key, _row_to_uuid)
1099 except error.Error as e:
1100 # XXX rate-limit
1101 vlog.err("attempting to write bad value to column %s (%s)"
1102 % (column_name, e))
1103 return
1104 inserts = self._mutations.setdefault('_inserts', {})
1105 column_value = inserts.setdefault(column_name, set())
1106 column_value.add(key)
a59912a0
RM
1107
1108 def delvalue(self, column_name, key):
1109 self._idl.txn._txn_rows[self.uuid] = self
330b9c9c
AB
1110 column = self._table.columns[column_name]
1111 try:
1112 data.Datum.from_python(column.type, key, _row_to_uuid)
1113 except error.Error as e:
1114 # XXX rate-limit
1115 vlog.err("attempting to delete bad value from column %s (%s)"
1116 % (column_name, e))
1117 return
1118 removes = self._mutations.setdefault('_removes', {})
1119 column_value = removes.setdefault(column_name, set())
1120 column_value.add(key)
a59912a0
RM
1121
1122 def setkey(self, column_name, key, value):
1123 self._idl.txn._txn_rows[self.uuid] = self
1124 column = self._table.columns[column_name]
1125 try:
330b9c9c 1126 data.Datum.from_python(column.type, {key: value}, _row_to_uuid)
a59912a0
RM
1127 except error.Error as e:
1128 # XXX rate-limit
1129 vlog.err("attempting to write bad value to column %s (%s)"
1130 % (column_name, e))
1131 return
2d54d801 1132 if self._data and column_name in self._data:
330b9c9c
AB
1133 # Remove existing key/value before updating.
1134 removes = self._mutations.setdefault('_removes', {})
1135 column_value = removes.setdefault(column_name, set())
1136 column_value.add(key)
1137 inserts = self._mutations.setdefault('_inserts', {})
1138 column_value = inserts.setdefault(column_name, {})
1139 column_value[key] = value
1140
1141 def delkey(self, column_name, key, value=None):
a59912a0 1142 self._idl.txn._txn_rows[self.uuid] = self
330b9c9c
AB
1143 if value:
1144 try:
1145 old_value = data.Datum.to_python(self._data[column_name],
1146 _uuid_to_row)
1147 except error.Error:
1148 return
1149 if key not in old_value:
1150 return
1151 if old_value[key] != value:
1152 return
1153 removes = self._mutations.setdefault('_removes', {})
1154 column_value = removes.setdefault(column_name, set())
1155 column_value.add(key)
a59912a0
RM
1156 return
1157
d7d417fc
TW
1158 @classmethod
1159 def from_json(cls, idl, table, uuid, row_json):
1160 data = {}
0c4d144a 1161 for column_name, datum_json in row_json.items():
d7d417fc
TW
1162 column = table.columns.get(column_name)
1163 if not column:
1164 # XXX rate-limit
1165 vlog.warn("unknown column %s in table %s"
1166 % (column_name, table.name))
1167 continue
1168 try:
1169 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
3ab76c56 1170 except error.Error as e:
d7d417fc
TW
1171 # XXX rate-limit
1172 vlog.warn("error parsing column %s in table %s: %s"
1173 % (column_name, table.name, e))
1174 continue
1175 data[column_name] = datum
1176 return cls(idl, table, uuid, data)
1177
8cdf0349
BP
1178 def verify(self, column_name):
1179 """Causes the original contents of column 'column_name' in this row to
1180 be verified as a prerequisite to completing the transaction. That is,
1181 if 'column_name' changed in this row (or if this row was deleted)
1182 between the time that the IDL originally read its contents and the time
1183 that the transaction commits, then the transaction aborts and
854a94d9 1184 Transaction.commit() returns Transaction.TRY_AGAIN.
8cdf0349
BP
1185
1186 The intention is that, to ensure that no transaction commits based on
1187 dirty reads, an application should call Row.verify() on each data item
1188 read as part of a read-modify-write operation.
1189
1190 In some cases Row.verify() reduces to a no-op, because the current
1191 value of the column is already known:
1192
1193 - If this row is a row created by the current transaction (returned
1194 by Transaction.insert()).
1195
1196 - If the column has already been modified within the current
1197 transaction.
1198
1199 Because of the latter property, always call Row.verify() *before*
1200 modifying the column, for a given read-modify-write.
1201
1202 A transaction must be in progress."""
1203 assert self._idl.txn
1204 assert self._changes is not None
1205 if not self._data or column_name in self._changes:
1206 return
1207
1208 self._prereqs[column_name] = None
1209
1210 def delete(self):
1211 """Deletes this row from its table.
1212
1213 A transaction must be in progress."""
1214 assert self._idl.txn
1215 assert self._changes is not None
1216 if self._data is None:
1217 del self._idl.txn._txn_rows[self.uuid]
8d3efc1c
BP
1218 else:
1219 self._idl.txn._txn_rows[self.uuid] = self
8cdf0349 1220 del self._table.rows[self.uuid]
13973bc4 1221 self.__dict__["_changes"] = None
8cdf0349 1222
80c12152
SA
1223 def fetch(self, column_name):
1224 self._idl.txn._fetch(self, column_name)
1225
94fbe1aa
BP
1226 def increment(self, column_name):
1227 """Causes the transaction, when committed, to increment the value of
1228 'column_name' within this row by 1. 'column_name' must have an integer
1229 type. After the transaction commits successfully, the client may
1230 retrieve the final (incremented) value of 'column_name' with
1231 Transaction.get_increment_new_value().
1232
1233 The client could accomplish something similar by reading and writing
1234 and verify()ing columns. However, increment() will never (by itself)
1235 cause a transaction to fail because of a verify error.
1236
1237 The intended use is for incrementing the "next_cfg" column in
1238 the Open_vSwitch table."""
1239 self._idl.txn._increment(self, column_name)
1240
26bb0f31 1241
8cdf0349
BP
1242def _uuid_name_from_uuid(uuid):
1243 return "row%s" % str(uuid).replace("-", "_")
1244
26bb0f31 1245
8cdf0349
BP
1246def _where_uuid_equals(uuid):
1247 return [["_uuid", "==", ["uuid", str(uuid)]]]
1248
26bb0f31 1249
8cdf0349
BP
1250class _InsertedRow(object):
1251 def __init__(self, op_index):
1252 self.op_index = op_index
1253 self.real = None
1254
26bb0f31 1255
8cdf0349 1256class Transaction(object):
2f926787
BP
1257 """A transaction may modify the contents of a database by modifying the
1258 values of columns, deleting rows, inserting rows, or adding checks that
1259 columns in the database have not changed ("verify" operations), through
1260 Row methods.
1261
1262 Reading and writing columns and inserting and deleting rows are all
1263 straightforward. The reasons to verify columns are less obvious.
1264 Verification is the key to maintaining transactional integrity. Because
1265 OVSDB handles multiple clients, it can happen that between the time that
1266 OVSDB client A reads a column and writes a new value, OVSDB client B has
1267 written that column. Client A's write should not ordinarily overwrite
1268 client B's, especially if the column in question is a "map" column that
1269 contains several more or less independent data items. If client A adds a
1270 "verify" operation before it writes the column, then the transaction fails
1271 in case client B modifies it first. Client A will then see the new value
1272 of the column and compose a new transaction based on the new contents
1273 written by client B.
1274
1275 When a transaction is complete, which must be before the next call to
1276 Idl.run(), call Transaction.commit() or Transaction.abort().
1277
1278 The life-cycle of a transaction looks like this:
1279
1280 1. Create the transaction and record the initial sequence number:
1281
1282 seqno = idl.change_seqno(idl)
1283 txn = Transaction(idl)
1284
1285 2. Modify the database with Row and Transaction methods.
1286
1287 3. Commit the transaction by calling Transaction.commit(). The first call
1288 to this function probably returns Transaction.INCOMPLETE. The client
1289 must keep calling again along as this remains true, calling Idl.run() in
1290 between to let the IDL do protocol processing. (If the client doesn't
1291 have anything else to do in the meantime, it can use
1292 Transaction.commit_block() to avoid having to loop itself.)
1293
1294 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
1295 to change from the saved 'seqno' (it's possible that it's already
1296 changed, in which case the client should not wait at all), then start
1297 over from step 1. Only a call to Idl.run() will change the return value
1298 of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)"""
1299
8cdf0349 1300 # Status values that Transaction.commit() can return.
eda26d40
RB
1301
1302 # Not yet committed or aborted.
1303 UNCOMMITTED = "uncommitted"
1304 # Transaction didn't include any changes.
1305 UNCHANGED = "unchanged"
1306 # Commit in progress, please wait.
1307 INCOMPLETE = "incomplete"
1308 # ovsdb_idl_txn_abort() called.
1309 ABORTED = "aborted"
1310 # Commit successful.
1311 SUCCESS = "success"
1312 # Commit failed because a "verify" operation
1313 # reported an inconsistency, due to a network
1314 # problem, or other transient failure. Wait
1315 # for a change, then try again.
1316 TRY_AGAIN = "try again"
1317 # Server hasn't given us the lock yet.
1318 NOT_LOCKED = "not locked"
1319 # Commit failed due to a hard error.
1320 ERROR = "error"
8cdf0349
BP
1321
1322 @staticmethod
1323 def status_to_string(status):
1324 """Converts one of the status values that Transaction.commit() can
1325 return into a human-readable string.
1326
1327 (The status values are in fact such strings already, so
1328 there's nothing to do.)"""
1329 return status
1330
1331 def __init__(self, idl):
1332 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
1333 A given Idl may only have a single active transaction at a time.
1334
1335 A Transaction may modify the contents of a database by assigning new
1336 values to columns (attributes of Row), deleting rows (with
1337 Row.delete()), or inserting rows (with Transaction.insert()). It may
1338 also check that columns in the database have not changed with
1339 Row.verify().
1340
1341 When a transaction is complete (which must be before the next call to
1342 Idl.run()), call Transaction.commit() or Transaction.abort()."""
1343 assert idl.txn is None
1344
1345 idl.txn = self
1346 self._request_id = None
1347 self.idl = idl
1348 self.dry_run = False
1349 self._txn_rows = {}
1350 self._status = Transaction.UNCOMMITTED
1351 self._error = None
1352 self._comments = []
1353
94fbe1aa 1354 self._inc_row = None
8cdf0349 1355 self._inc_column = None
8cdf0349 1356
80c12152
SA
1357 self._fetch_requests = []
1358
26bb0f31 1359 self._inserted_rows = {} # Map from UUID to _InsertedRow
8cdf0349
BP
1360
1361 def add_comment(self, comment):
80c12152 1362 """Appends 'comment' to the comments that will be passed to the OVSDB
8cdf0349
BP
1363 server when this transaction is committed. (The comment will be
1364 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
1365 relatively human-readable form.)"""
1366 self._comments.append(comment)
1367
8cdf0349 1368 def wait(self, poller):
2f926787
BP
1369 """Causes poll_block() to wake up if this transaction has completed
1370 committing."""
8cdf0349
BP
1371 if self._status not in (Transaction.UNCOMMITTED,
1372 Transaction.INCOMPLETE):
1373 poller.immediate_wake()
1374
1375 def _substitute_uuids(self, json):
da2d45c6 1376 if isinstance(json, (list, tuple)):
8cdf0349 1377 if (len(json) == 2
897c8064
LS
1378 and json[0] == 'uuid'
1379 and ovs.ovsuuid.is_valid_string(json[1])):
8cdf0349
BP
1380 uuid = ovs.ovsuuid.from_string(json[1])
1381 row = self._txn_rows.get(uuid, None)
1382 if row and row._data is None:
1383 return ["named-uuid", _uuid_name_from_uuid(uuid)]
225b582a
IY
1384 else:
1385 return [self._substitute_uuids(elem) for elem in json]
8cdf0349
BP
1386 return json
1387
1388 def __disassemble(self):
1389 self.idl.txn = None
1390
0c4d144a 1391 for row in self._txn_rows.values():
8cdf0349 1392 if row._changes is None:
13973bc4
TW
1393 # If we add the deleted row back to rows with _changes == None
1394 # then __getattr__ will not work for the indexes
1395 row.__dict__["_changes"] = {}
1396 row.__dict__["_mutations"] = {}
8cdf0349
BP
1397 row._table.rows[row.uuid] = row
1398 elif row._data is None:
1399 del row._table.rows[row.uuid]
1400 row.__dict__["_changes"] = {}
a59912a0 1401 row.__dict__["_mutations"] = {}
8cdf0349
BP
1402 row.__dict__["_prereqs"] = {}
1403 self._txn_rows = {}
1404
1405 def commit(self):
2f926787
BP
1406 """Attempts to commit 'txn'. Returns the status of the commit
1407 operation, one of the following constants:
1408
1409 Transaction.INCOMPLETE:
1410
1411 The transaction is in progress, but not yet complete. The caller
1412 should call again later, after calling Idl.run() to let the
1413 IDL do OVSDB protocol processing.
1414
1415 Transaction.UNCHANGED:
1416
1417 The transaction is complete. (It didn't actually change the
1418 database, so the IDL didn't send any request to the database
1419 server.)
1420
1421 Transaction.ABORTED:
1422
1423 The caller previously called Transaction.abort().
1424
1425 Transaction.SUCCESS:
1426
1427 The transaction was successful. The update made by the
1428 transaction (and possibly other changes made by other database
1429 clients) should already be visible in the IDL.
1430
1431 Transaction.TRY_AGAIN:
1432
1433 The transaction failed for some transient reason, e.g. because a
1434 "verify" operation reported an inconsistency or due to a network
1435 problem. The caller should wait for a change to the database,
1436 then compose a new transaction, and commit the new transaction.
1437
1438 Use Idl.change_seqno to wait for a change in the database. It is
1439 important to use its value *before* the initial call to
1440 Transaction.commit() as the baseline for this purpose, because
1441 the change that one should wait for can happen after the initial
1442 call but before the call that returns Transaction.TRY_AGAIN, and
1443 using some other baseline value in that situation could cause an
1444 indefinite wait if the database rarely changes.
1445
1446 Transaction.NOT_LOCKED:
1447
1448 The transaction failed because the IDL has been configured to
1449 require a database lock (with Idl.set_lock()) but didn't
1450 get it yet or has already lost it.
8cdf0349
BP
1451
1452 Committing a transaction rolls back all of the changes that it made to
2f926787 1453 the IDL's copy of the database. If the transaction commits
8cdf0349 1454 successfully, then the database server will send an update and, thus,
2f926787 1455 the IDL will be updated with the committed changes."""
8cdf0349
BP
1456 # The status can only change if we're the active transaction.
1457 # (Otherwise, our status will change only in Idl.run().)
1458 if self != self.idl.txn:
1459 return self._status
1460
1461 # If we need a lock but don't have it, give up quickly.
9614403d 1462 if self.idl.lock_name and not self.idl.has_lock:
8cdf0349
BP
1463 self._status = Transaction.NOT_LOCKED
1464 self.__disassemble()
1465 return self._status
1466
1467 operations = [self.idl._db.name]
1468
1469 # Assert that we have the required lock (avoiding a race).
1470 if self.idl.lock_name:
1471 operations.append({"op": "assert",
1472 "lock": self.idl.lock_name})
1473
1474 # Add prerequisites and declarations of new rows.
0c4d144a 1475 for row in self._txn_rows.values():
8cdf0349
BP
1476 if row._prereqs:
1477 rows = {}
1478 columns = []
1479 for column_name in row._prereqs:
1480 columns.append(column_name)
1481 rows[column_name] = row._data[column_name].to_json()
1482 operations.append({"op": "wait",
1483 "table": row._table.name,
1484 "timeout": 0,
1485 "where": _where_uuid_equals(row.uuid),
1486 "until": "==",
1487 "columns": columns,
1488 "rows": [rows]})
1489
1490 # Add updates.
1491 any_updates = False
0c4d144a 1492 for row in self._txn_rows.values():
8cdf0349
BP
1493 if row._changes is None:
1494 if row._table.is_root:
1495 operations.append({"op": "delete",
1496 "table": row._table.name,
1497 "where": _where_uuid_equals(row.uuid)})
1498 any_updates = True
1499 else:
1500 # Let ovsdb-server decide whether to really delete it.
1501 pass
1502 elif row._changes:
1503 op = {"table": row._table.name}
1504 if row._data is None:
1505 op["op"] = "insert"
1506 op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
1507 any_updates = True
1508
1509 op_index = len(operations) - 1
1510 self._inserted_rows[row.uuid] = _InsertedRow(op_index)
1511 else:
1512 op["op"] = "update"
1513 op["where"] = _where_uuid_equals(row.uuid)
1514
1515 row_json = {}
1516 op["row"] = row_json
1517
0c4d144a 1518 for column_name, datum in row._changes.items():
8cdf0349 1519 if row._data is not None or not datum.is_default():
26bb0f31 1520 row_json[column_name] = (
897c8064 1521 self._substitute_uuids(datum.to_json()))
8cdf0349
BP
1522
1523 # If anything really changed, consider it an update.
1524 # We can't suppress not-really-changed values earlier
1525 # or transactions would become nonatomic (see the big
1526 # comment inside Transaction._write()).
1527 if (not any_updates and row._data is not None and
897c8064 1528 row._data[column_name] != datum):
8cdf0349
BP
1529 any_updates = True
1530
1531 if row._data is None or row_json:
1532 operations.append(op)
a59912a0
RM
1533 if row._mutations:
1534 addop = False
1535 op = {"table": row._table.name}
1536 op["op"] = "mutate"
b3220c67
AB
1537 if row._data is None:
1538 # New row
1539 op["where"] = self._substitute_uuids(
1540 _where_uuid_equals(row.uuid))
1541 else:
1542 # Existing row
1543 op["where"] = _where_uuid_equals(row.uuid)
a59912a0
RM
1544 op["mutations"] = []
1545 if '_removes' in row._mutations.keys():
0c4d144a 1546 for col, dat in row._mutations['_removes'].items():
a59912a0
RM
1547 column = row._table.columns[col]
1548 if column.type.is_map():
1549 opdat = ["set"]
330b9c9c 1550 opdat.append(list(dat))
a59912a0
RM
1551 else:
1552 opdat = ["set"]
1553 inner_opdat = []
1554 for ele in dat:
1555 try:
1556 datum = data.Datum.from_python(column.type,
1557 ele, _row_to_uuid)
1558 except error.Error:
1559 return
1560 inner_opdat.append(
1561 self._substitute_uuids(datum.to_json()))
1562 opdat.append(inner_opdat)
1563 mutation = [col, "delete", opdat]
1564 op["mutations"].append(mutation)
1565 addop = True
1566 if '_inserts' in row._mutations.keys():
0c4d144a 1567 for col, val in row._mutations['_inserts'].items():
a59912a0
RM
1568 column = row._table.columns[col]
1569 if column.type.is_map():
330b9c9c
AB
1570 datum = data.Datum.from_python(column.type, val,
1571 _row_to_uuid)
9435b0b8 1572 opdat = self._substitute_uuids(datum.to_json())
a59912a0
RM
1573 else:
1574 opdat = ["set"]
1575 inner_opdat = []
330b9c9c 1576 for ele in val:
a59912a0
RM
1577 try:
1578 datum = data.Datum.from_python(column.type,
1579 ele, _row_to_uuid)
1580 except error.Error:
1581 return
1582 inner_opdat.append(
1583 self._substitute_uuids(datum.to_json()))
1584 opdat.append(inner_opdat)
1585 mutation = [col, "insert", opdat]
1586 op["mutations"].append(mutation)
1587 addop = True
1588 if addop:
1589 operations.append(op)
1590 any_updates = True
8cdf0349 1591
80c12152
SA
1592 if self._fetch_requests:
1593 for fetch in self._fetch_requests:
1594 fetch["index"] = len(operations) - 1
1595 operations.append({"op": "select",
1596 "table": fetch["row"]._table.name,
1597 "where": self._substitute_uuids(
1598 _where_uuid_equals(fetch["row"].uuid)),
1599 "columns": [fetch["column_name"]]})
1600 any_updates = True
1601
8cdf0349 1602 # Add increment.
94fbe1aa 1603 if self._inc_row and any_updates:
8cdf0349
BP
1604 self._inc_index = len(operations) - 1
1605
1606 operations.append({"op": "mutate",
94fbe1aa 1607 "table": self._inc_row._table.name,
26bb0f31 1608 "where": self._substitute_uuids(
94fbe1aa 1609 _where_uuid_equals(self._inc_row.uuid)),
8cdf0349
BP
1610 "mutations": [[self._inc_column, "+=", 1]]})
1611 operations.append({"op": "select",
94fbe1aa 1612 "table": self._inc_row._table.name,
26bb0f31 1613 "where": self._substitute_uuids(
94fbe1aa 1614 _where_uuid_equals(self._inc_row.uuid)),
8cdf0349
BP
1615 "columns": [self._inc_column]})
1616
1617 # Add comment.
1618 if self._comments:
1619 operations.append({"op": "comment",
1620 "comment": "\n".join(self._comments)})
1621
1622 # Dry run?
1623 if self.dry_run:
1624 operations.append({"op": "abort"})
1625
1626 if not any_updates:
1627 self._status = Transaction.UNCHANGED
1628 else:
1629 msg = ovs.jsonrpc.Message.create_request("transact", operations)
1630 self._request_id = msg.id
1631 if not self.idl._session.send(msg):
1632 self.idl._outstanding_txns[self._request_id] = self
1633 self._status = Transaction.INCOMPLETE
1634 else:
854a94d9 1635 self._status = Transaction.TRY_AGAIN
8cdf0349
BP
1636
1637 self.__disassemble()
1638 return self._status
1639
1640 def commit_block(self):
2f926787
BP
1641 """Attempts to commit this transaction, blocking until the commit
1642 either succeeds or fails. Returns the final commit status, which may
1643 be any Transaction.* value other than Transaction.INCOMPLETE.
1644
1645 This function calls Idl.run() on this transaction'ss IDL, so it may
1646 cause Idl.change_seqno to change."""
8cdf0349
BP
1647 while True:
1648 status = self.commit()
1649 if status != Transaction.INCOMPLETE:
1650 return status
1651
1652 self.idl.run()
1653
1654 poller = ovs.poller.Poller()
1655 self.idl.wait(poller)
1656 self.wait(poller)
1657 poller.block()
1658
1659 def get_increment_new_value(self):
2f926787
BP
1660 """Returns the final (incremented) value of the column in this
1661 transaction that was set to be incremented by Row.increment. This
1662 transaction must have committed successfully."""
8cdf0349
BP
1663 assert self._status == Transaction.SUCCESS
1664 return self._inc_new_value
1665
1666 def abort(self):
1667 """Aborts this transaction. If Transaction.commit() has already been
1668 called then the transaction might get committed anyhow."""
1669 self.__disassemble()
1670 if self._status in (Transaction.UNCOMMITTED,
1671 Transaction.INCOMPLETE):
1672 self._status = Transaction.ABORTED
1673
1674 def get_error(self):
1675 """Returns a string representing this transaction's current status,
1676 suitable for use in log messages."""
1677 if self._status != Transaction.ERROR:
1678 return Transaction.status_to_string(self._status)
1679 elif self._error:
1680 return self._error
1681 else:
1682 return "no error details available"
1683
1684 def __set_error_json(self, json):
1685 if self._error is None:
1686 self._error = ovs.json.to_string(json)
1687
1688 def get_insert_uuid(self, uuid):
1689 """Finds and returns the permanent UUID that the database assigned to a
1690 newly inserted row, given the UUID that Transaction.insert() assigned
1691 locally to that row.
1692
1693 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1694 or if it was assigned by that function and then deleted by Row.delete()
1695 within the same transaction. (Rows that are inserted and then deleted
1696 within a single transaction are never sent to the database server, so
1697 it never assigns them a permanent UUID.)
1698
1699 This transaction must have completed successfully."""
1700 assert self._status in (Transaction.SUCCESS,
1701 Transaction.UNCHANGED)
1702 inserted_row = self._inserted_rows.get(uuid)
1703 if inserted_row:
1704 return inserted_row.real
1705 return None
1706
94fbe1aa
BP
1707 def _increment(self, row, column):
1708 assert not self._inc_row
1709 self._inc_row = row
1710 self._inc_column = column
1711
80c12152 1712 def _fetch(self, row, column_name):
a0631d92 1713 self._fetch_requests.append({"row": row, "column_name": column_name})
80c12152 1714
8cdf0349
BP
1715 def _write(self, row, column, datum):
1716 assert row._changes is not None
a59912a0 1717 assert row._mutations is not None
8cdf0349
BP
1718
1719 txn = row._idl.txn
1720
1721 # If this is a write-only column and the datum being written is the
1722 # same as the one already there, just skip the update entirely. This
1723 # is worth optimizing because we have a lot of columns that get
1724 # periodically refreshed into the database but don't actually change
1725 # that often.
1726 #
1727 # We don't do this for read/write columns because that would break
1728 # atomicity of transactions--some other client might have written a
1729 # different value in that column since we read it. (But if a whole
1730 # transaction only does writes of existing values, without making any
1731 # real changes, we will drop the whole transaction later in
1732 # ovsdb_idl_txn_commit().)
37520ab3
RB
1733 if (not column.alert and row._data and
1734 row._data.get(column.name) == datum):
8cdf0349
BP
1735 new_value = row._changes.get(column.name)
1736 if new_value is None or new_value == datum:
1737 return
1738
1739 txn._txn_rows[row.uuid] = row
330b9c9c
AB
1740 if '_inserts' in row._mutations:
1741 row._mutations['_inserts'].pop(column.name, None)
1742 if '_removes' in row._mutations:
1743 row._mutations['_removes'].pop(column.name, None)
1744 row._changes[column.name] = datum.copy()
8cdf0349
BP
1745
1746 def insert(self, table, new_uuid=None):
1747 """Inserts and returns a new row in 'table', which must be one of the
1748 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1749
1750 The new row is assigned a provisional UUID. If 'uuid' is None then one
1751 is randomly generated; otherwise 'uuid' should specify a randomly
1752 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
1753 different UUID when 'txn' is committed, but the IDL will replace any
1754 uses of the provisional UUID in the data to be to be committed by the
1755 UUID assigned by ovsdb-server."""
1756 assert self._status == Transaction.UNCOMMITTED
1757 if new_uuid is None:
1758 new_uuid = uuid.uuid4()
1759 row = Row(self.idl, table, new_uuid, None)
1760 table.rows[row.uuid] = row
1761 self._txn_rows[row.uuid] = row
1762 return row
1763
1764 def _process_reply(self, msg):
1765 if msg.type == ovs.jsonrpc.Message.T_ERROR:
1766 self._status = Transaction.ERROR
da2d45c6 1767 elif not isinstance(msg.result, (list, tuple)):
8cdf0349 1768 # XXX rate-limit
3a656eaf 1769 vlog.warn('reply to "transact" is not JSON array')
8cdf0349
BP
1770 else:
1771 hard_errors = False
1772 soft_errors = False
1773 lock_errors = False
1774
1775 ops = msg.result
1776 for op in ops:
1777 if op is None:
1778 # This isn't an error in itself but indicates that some
1779 # prior operation failed, so make sure that we know about
1780 # it.
1781 soft_errors = True
da2d45c6 1782 elif isinstance(op, dict):
8cdf0349
BP
1783 error = op.get("error")
1784 if error is not None:
1785 if error == "timed out":
1786 soft_errors = True
1787 elif error == "not owner":
1788 lock_errors = True
1789 elif error == "aborted":
1790 pass
1791 else:
1792 hard_errors = True
1793 self.__set_error_json(op)
1794 else:
1795 hard_errors = True
1796 self.__set_error_json(op)
1797 # XXX rate-limit
3a656eaf 1798 vlog.warn("operation reply is not JSON null or object")
8cdf0349
BP
1799
1800 if not soft_errors and not hard_errors and not lock_errors:
94fbe1aa 1801 if self._inc_row and not self.__process_inc_reply(ops):
8cdf0349 1802 hard_errors = True
80c12152
SA
1803 if self._fetch_requests:
1804 if self.__process_fetch_reply(ops):
1805 self.idl.change_seqno += 1
1806 else:
1807 hard_errors = True
8cdf0349 1808
0c4d144a 1809 for insert in self._inserted_rows.values():
8cdf0349
BP
1810 if not self.__process_insert_reply(insert, ops):
1811 hard_errors = True
1812
1813 if hard_errors:
1814 self._status = Transaction.ERROR
1815 elif lock_errors:
1816 self._status = Transaction.NOT_LOCKED
1817 elif soft_errors:
854a94d9 1818 self._status = Transaction.TRY_AGAIN
8cdf0349
BP
1819 else:
1820 self._status = Transaction.SUCCESS
1821
1822 @staticmethod
1823 def __check_json_type(json, types, name):
1824 if not json:
1825 # XXX rate-limit
3a656eaf 1826 vlog.warn("%s is missing" % name)
8cdf0349 1827 return False
da2d45c6 1828 elif not isinstance(json, tuple(types)):
8cdf0349 1829 # XXX rate-limit
3a656eaf 1830 vlog.warn("%s has unexpected type %s" % (name, type(json)))
8cdf0349
BP
1831 return False
1832 else:
1833 return True
1834
80c12152
SA
1835 def __process_fetch_reply(self, ops):
1836 update = False
1837 for fetch_request in self._fetch_requests:
1838 row = fetch_request["row"]
1839 column_name = fetch_request["column_name"]
1840 index = fetch_request["index"]
1841 table = row._table
1842
1843 select = ops[index]
1844 fetched_rows = select.get("rows")
1845 if not Transaction.__check_json_type(fetched_rows, (list, tuple),
1846 '"select" reply "rows"'):
1847 return False
1848 if len(fetched_rows) != 1:
1849 # XXX rate-limit
1850 vlog.warn('"select" reply "rows" has %d elements '
e8049bc5 1851 'instead of 1' % len(fetched_rows))
80c12152
SA
1852 continue
1853 fetched_row = fetched_rows[0]
1854 if not Transaction.__check_json_type(fetched_row, (dict,),
1855 '"select" reply row'):
1856 continue
1857
1858 column = table.columns.get(column_name)
1859 datum_json = fetched_row.get(column_name)
a59912a0 1860 datum = data.Datum.from_json(column.type, datum_json)
80c12152
SA
1861
1862 row._data[column_name] = datum
1863 update = True
1864
1865 return update
1866
8cdf0349
BP
1867 def __process_inc_reply(self, ops):
1868 if self._inc_index + 2 > len(ops):
1869 # XXX rate-limit
3a656eaf
EJ
1870 vlog.warn("reply does not contain enough operations for "
1871 "increment (has %d, needs %d)" %
1872 (len(ops), self._inc_index + 2))
8cdf0349
BP
1873
1874 # We know that this is a JSON object because the loop in
1875 # __process_reply() already checked.
1876 mutate = ops[self._inc_index]
1877 count = mutate.get("count")
0c4d144a 1878 if not Transaction.__check_json_type(count, (int,),
8cdf0349
BP
1879 '"mutate" reply "count"'):
1880 return False
1881 if count != 1:
1882 # XXX rate-limit
3a656eaf 1883 vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
8cdf0349
BP
1884 return False
1885
1886 select = ops[self._inc_index + 1]
1887 rows = select.get("rows")
1888 if not Transaction.__check_json_type(rows, (list, tuple),
1889 '"select" reply "rows"'):
1890 return False
1891 if len(rows) != 1:
1892 # XXX rate-limit
3a656eaf
EJ
1893 vlog.warn('"select" reply "rows" has %d elements '
1894 'instead of 1' % len(rows))
8cdf0349
BP
1895 return False
1896 row = rows[0]
1897 if not Transaction.__check_json_type(row, (dict,),
1898 '"select" reply row'):
1899 return False
1900 column = row.get(self._inc_column)
0c4d144a 1901 if not Transaction.__check_json_type(column, (int,),
8cdf0349
BP
1902 '"select" reply inc column'):
1903 return False
1904 self._inc_new_value = column
1905 return True
1906
1907 def __process_insert_reply(self, insert, ops):
1908 if insert.op_index >= len(ops):
1909 # XXX rate-limit
3a656eaf
EJ
1910 vlog.warn("reply does not contain enough operations "
1911 "for insert (has %d, needs %d)"
1912 % (len(ops), insert.op_index))
8cdf0349
BP
1913 return False
1914
1915 # We know that this is a JSON object because the loop in
1916 # __process_reply() already checked.
1917 reply = ops[insert.op_index]
1918 json_uuid = reply.get("uuid")
1919 if not Transaction.__check_json_type(json_uuid, (tuple, list),
1920 '"insert" reply "uuid"'):
1921 return False
1922
1923 try:
1924 uuid_ = ovs.ovsuuid.from_json(json_uuid)
1925 except error.Error:
1926 # XXX rate-limit
3a656eaf 1927 vlog.warn('"insert" reply "uuid" is not a JSON UUID')
8cdf0349 1928 return False
bf6ec045 1929
8cdf0349
BP
1930 insert.real = uuid_
1931 return True
ad0991e6
EJ
1932
1933
1934class SchemaHelper(object):
1935 """IDL Schema helper.
1936
1937 This class encapsulates the logic required to generate schemas suitable
1938 for creating 'ovs.db.idl.Idl' objects. Clients should register columns
1939 they are interested in using register_columns(). When finished, the
1940 get_idl_schema() function may be called.
1941
1942 The location on disk of the schema used may be found in the
1943 'schema_location' variable."""
1944
e15ad8e6
IY
1945 def __init__(self, location=None, schema_json=None):
1946 """Creates a new Schema object.
ad0991e6 1947
e15ad8e6
IY
1948 'location' file path to ovs schema. None means default location
1949 'schema_json' schema in json preresentation in memory
1950 """
1951
1952 if location and schema_json:
1953 raise ValueError("both location and schema_json can't be "
1954 "specified. it's ambiguous.")
1955 if schema_json is None:
1956 if location is None:
1957 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1958 schema_json = ovs.json.from_file(location)
ad0991e6 1959
e15ad8e6 1960 self.schema_json = schema_json
ad0991e6 1961 self._tables = {}
80c12152 1962 self._readonly = {}
bf42f674 1963 self._all = False
ad0991e6 1964
80c12152 1965 def register_columns(self, table, columns, readonly=[]):
ad0991e6
EJ
1966 """Registers interest in the given 'columns' of 'table'. Future calls
1967 to get_idl_schema() will include 'table':column for each column in
1968 'columns'. This function automatically avoids adding duplicate entries
1969 to the schema.
80c12152
SA
1970 A subset of 'columns' can be specified as 'readonly'. The readonly
1971 columns are not replicated but can be fetched on-demand by the user
1972 with Row.fetch().
ad0991e6
EJ
1973
1974 'table' must be a string.
1975 'columns' must be a list of strings.
80c12152 1976 'readonly' must be a list of strings.
ad0991e6
EJ
1977 """
1978
0c4d144a 1979 assert isinstance(table, str)
da2d45c6 1980 assert isinstance(columns, list)
ad0991e6
EJ
1981
1982 columns = set(columns) | self._tables.get(table, set())
1983 self._tables[table] = columns
80c12152 1984 self._readonly[table] = readonly
ad0991e6 1985
7698e31d
IY
1986 def register_table(self, table):
1987 """Registers interest in the given all columns of 'table'. Future calls
1988 to get_idl_schema() will include all columns of 'table'.
1989
1990 'table' must be a string
1991 """
0c4d144a 1992 assert isinstance(table, str)
7698e31d
IY
1993 self._tables[table] = set() # empty set means all columns in the table
1994
bf42f674
EJ
1995 def register_all(self):
1996 """Registers interest in every column of every table."""
1997 self._all = True
1998
ad0991e6
EJ
1999 def get_idl_schema(self):
2000 """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
2001 object based on columns registered using the register_columns()
2002 function."""
2003
e15ad8e6
IY
2004 schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
2005 self.schema_json = None
ad0991e6 2006
bf42f674
EJ
2007 if not self._all:
2008 schema_tables = {}
0c4d144a 2009 for table, columns in self._tables.items():
bf42f674
EJ
2010 schema_tables[table] = (
2011 self._keep_table_columns(schema, table, columns))
2012
2013 schema.tables = schema_tables
80c12152 2014 schema.readonly = self._readonly
ad0991e6
EJ
2015 return schema
2016
2017 def _keep_table_columns(self, schema, table_name, columns):
2018 assert table_name in schema.tables
2019 table = schema.tables[table_name]
2020
7698e31d
IY
2021 if not columns:
2022 # empty set means all columns in the table
2023 return table
2024
ad0991e6
EJ
2025 new_columns = {}
2026 for column_name in columns:
0c4d144a 2027 assert isinstance(column_name, str)
ad0991e6
EJ
2028 assert column_name in table.columns
2029
2030 new_columns[column_name] = table.columns[column_name]
2031
2032 table.columns = new_columns
2033 return table