]> git.proxmox.com Git - ovs.git/blob - python/ovs/db/idl.py
d1d91552b1b86b42ee7d9b2e14a387e5ceca77db
[ovs.git] / python / ovs / db / idl.py
1 # Copyright (c) 2009, 2010, 2011, 2012, 2013, 2016 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import functools
16 import uuid
17
18 import ovs.db.data as data
19 import ovs.db.parser
20 import ovs.db.schema
21 import ovs.jsonrpc
22 import ovs.ovsuuid
23 import ovs.poller
24 import ovs.vlog
25 from ovs.db import custom_index
26 from ovs.db import error
27
28 import six
29
30 vlog = ovs.vlog.Vlog("idl")
31
32 __pychecker__ = 'no-classattr no-objattrs'
33
34 ROW_CREATE = "create"
35 ROW_UPDATE = "update"
36 ROW_DELETE = "delete"
37
38 OVSDB_UPDATE = 0
39 OVSDB_UPDATE2 = 1
40
41 CLUSTERED = "clustered"
42
43
44 class Idl(object):
45 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
46
47 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
48 requests to an OVSDB database server and parses the responses, converting
49 raw JSON into data structures that are easier for clients to digest.
50
51 The IDL also assists with issuing database transactions. The client
52 creates a transaction, manipulates the IDL data structures, and commits or
53 aborts the transaction. The IDL then composes and issues the necessary
54 JSON-RPC requests and reports to the client whether the transaction
55 completed successfully.
56
57 The client is allowed to access the following attributes directly, in a
58 read-only fashion:
59
60 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
61 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
62 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
63 to a Row object.
64
65 The client may directly read and write the Row objects referenced by the
66 'rows' map values. Refer to Row for more details.
67
68 - 'change_seqno': A number that represents the IDL's state. When the IDL
69 is updated (by Idl.run()), its value changes. The sequence number can
70 occasionally change even if the database does not. This happens if the
71 connection to the database drops and reconnects, which causes the
72 database contents to be reloaded even if they didn't change. (It could
73 also happen if the database server sends out a "change" that reflects
74 what the IDL already thought was in the database. The database server is
75 not supposed to do that, but bugs could in theory cause it to do so.)
76
77 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
78 if no lock is configured.
79
80 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
81 lock, and False otherwise.
82
83 Locking and unlocking happens asynchronously from the database client's
84 point of view, so the information is only useful for optimization
85 (e.g. if the client doesn't have the lock then there's no point in trying
86 to write to the database).
87
88 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
89 the database server has indicated that some other client already owns the
90 requested lock, and False otherwise.
91
92 - 'txn': The ovs.db.idl.Transaction object for the database transaction
93 currently being constructed, if there is one, or None otherwise.
94 """
95
96 IDL_S_INITIAL = 0
97 IDL_S_SERVER_SCHEMA_REQUESTED = 1
98 IDL_S_SERVER_MONITOR_REQUESTED = 2
99 IDL_S_DATA_MONITOR_REQUESTED = 3
100 IDL_S_DATA_MONITOR_COND_REQUESTED = 4
101
102 def __init__(self, remote, schema_helper, probe_interval=None,
103 leader_only=True):
104 """Creates and returns a connection to the database named 'db_name' on
105 'remote', which should be in a form acceptable to
106 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
107 replica of the remote database.
108
109 'remote' can be comma separated multiple remotes and each remote
110 should be in a form acceptable to ovs.jsonrpc.session.open().
111
112 'schema_helper' should be an instance of the SchemaHelper class which
113 generates schema for the remote database. The caller may have cut it
114 down by removing tables or columns that are not of interest. The IDL
115 will only replicate the tables and columns that remain. The caller may
116 also add an attribute named 'alert' to selected remaining columns,
117 setting its value to False; if so, then changes to those columns will
118 not be considered changes to the database for the purpose of the return
119 value of Idl.run() and Idl.change_seqno. This is useful for columns
120 that the IDL's client will write but not read.
121
122 As a convenience to users, 'schema' may also be an instance of the
123 SchemaHelper class.
124
125 The IDL uses and modifies 'schema' directly.
126
127 If 'leader_only' is set to True (default value) the IDL will only
128 monitor and transact with the leader of the cluster.
129
130 If "probe_interval" is zero it disables the connection keepalive
131 feature. If non-zero the value will be forced to at least 1000
132 milliseconds. If None it will just use the default value in OVS.
133 """
134
135 assert isinstance(schema_helper, SchemaHelper)
136 schema = schema_helper.get_idl_schema()
137
138 self.tables = schema.tables
139 self.readonly = schema.readonly
140 self._db = schema
141 remotes = self._parse_remotes(remote)
142 self._session = ovs.jsonrpc.Session.open_multiple(remotes,
143 probe_interval=probe_interval)
144 self._monitor_request_id = None
145 self._last_seqno = None
146 self.change_seqno = 0
147 self.uuid = uuid.uuid1()
148
149 # Server monitor.
150 self._server_schema_request_id = None
151 self._server_monitor_request_id = None
152 self._db_change_aware_request_id = None
153 self._server_db_name = '_Server'
154 self._server_db_table = 'Database'
155 self.server_tables = None
156 self._server_db = None
157 self.server_monitor_uuid = uuid.uuid1()
158 self.leader_only = leader_only
159 self.cluster_id = None
160 self._min_index = 0
161
162 self.state = self.IDL_S_INITIAL
163
164 # Database locking.
165 self.lock_name = None # Name of lock we need, None if none.
166 self.has_lock = False # Has db server said we have the lock?
167 self.is_lock_contended = False # Has db server said we can't get lock?
168 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
169
170 # Transaction support.
171 self.txn = None
172 self._outstanding_txns = {}
173
174 for table in six.itervalues(schema.tables):
175 for column in six.itervalues(table.columns):
176 if not hasattr(column, 'alert'):
177 column.alert = True
178 table.need_table = False
179 table.rows = custom_index.IndexedRows(table)
180 table.idl = self
181 table.condition = [True]
182 table.cond_changed = False
183
184 def _parse_remotes(self, remote):
185 # If remote is -
186 # "tcp:10.0.0.1:6641,unix:/tmp/db.sock,t,s,tcp:10.0.0.2:6642"
187 # this function returns
188 # ["tcp:10.0.0.1:6641", "unix:/tmp/db.sock,t,s", tcp:10.0.0.2:6642"]
189 remotes = []
190 for r in remote.split(','):
191 if remotes and r.find(":") == -1:
192 remotes[-1] += "," + r
193 else:
194 remotes.append(r)
195 return remotes
196
197 def set_cluster_id(self, cluster_id):
198 """Set the id of the cluster that this idl must connect to."""
199 self.cluster_id = cluster_id
200 if self.state != self.IDL_S_INITIAL:
201 self.force_reconnect()
202
203 def index_create(self, table, name):
204 """Create a named multi-column index on a table"""
205 return self.tables[table].rows.index_create(name)
206
207 def index_irange(self, table, name, start, end):
208 """Return items in a named index between start/end inclusive"""
209 return self.tables[table].rows.indexes[name].irange(start, end)
210
211 def index_equal(self, table, name, value):
212 """Return items in a named index matching a value"""
213 return self.tables[table].rows.indexes[name].irange(value, value)
214
215 def close(self):
216 """Closes the connection to the database. The IDL will no longer
217 update."""
218 self._session.close()
219
220 def run(self):
221 """Processes a batch of messages from the database server. Returns
222 True if the database as seen through the IDL changed, False if it did
223 not change. The initial fetch of the entire contents of the remote
224 database is considered to be one kind of change. If the IDL has been
225 configured to acquire a database lock (with Idl.set_lock()), then
226 successfully acquiring the lock is also considered to be a change.
227
228 This function can return occasional false positives, that is, report
229 that the database changed even though it didn't. This happens if the
230 connection to the database drops and reconnects, which causes the
231 database contents to be reloaded even if they didn't change. (It could
232 also happen if the database server sends out a "change" that reflects
233 what we already thought was in the database, but the database server is
234 not supposed to do that.)
235
236 As an alternative to checking the return value, the client may check
237 for changes in self.change_seqno."""
238 assert not self.txn
239 initial_change_seqno = self.change_seqno
240
241 self.send_cond_change()
242 self._session.run()
243 i = 0
244 while i < 50:
245 i += 1
246 if not self._session.is_connected():
247 break
248
249 seqno = self._session.get_seqno()
250 if seqno != self._last_seqno:
251 self._last_seqno = seqno
252 self.__txn_abort_all()
253 self.__send_server_schema_request()
254 if self.lock_name:
255 self.__send_lock_request()
256 break
257
258 msg = self._session.recv()
259 if msg is None:
260 break
261
262 if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
263 and msg.method == "update2"
264 and len(msg.params) == 2):
265 # Database contents changed.
266 self.__parse_update(msg.params[1], OVSDB_UPDATE2)
267 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
268 and msg.method == "update"
269 and len(msg.params) == 2):
270 # Database contents changed.
271 if msg.params[0] == str(self.server_monitor_uuid):
272 self.__parse_update(msg.params[1], OVSDB_UPDATE,
273 tables=self.server_tables)
274 self.change_seqno = initial_change_seqno
275 if not self.__check_server_db():
276 self.force_reconnect()
277 break
278 else:
279 self.__parse_update(msg.params[1], OVSDB_UPDATE)
280 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
281 and self._monitor_request_id is not None
282 and self._monitor_request_id == msg.id):
283 # Reply to our "monitor" request.
284 try:
285 self.change_seqno += 1
286 self._monitor_request_id = None
287 self.__clear()
288 if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
289 self.__parse_update(msg.result, OVSDB_UPDATE2)
290 else:
291 assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
292 self.__parse_update(msg.result, OVSDB_UPDATE)
293
294 except error.Error as e:
295 vlog.err("%s: parse error in received schema: %s"
296 % (self._session.get_name(), e))
297 self.__error()
298 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
299 and self._server_schema_request_id is not None
300 and self._server_schema_request_id == msg.id):
301 # Reply to our "get_schema" of _Server request.
302 try:
303 self._server_schema_request_id = None
304 sh = SchemaHelper(None, msg.result)
305 sh.register_table(self._server_db_table)
306 schema = sh.get_idl_schema()
307 self._server_db = schema
308 self.server_tables = schema.tables
309 self.__send_server_monitor_request()
310 except error.Error as e:
311 vlog.err("%s: error receiving server schema: %s"
312 % (self._session.get_name(), e))
313 if self.cluster_id:
314 self.__error()
315 break
316 else:
317 self.change_seqno = initial_change_seqno
318 self.__send_monitor_request()
319 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
320 and self._server_monitor_request_id is not None
321 and self._server_monitor_request_id == msg.id):
322 # Reply to our "monitor" of _Server request.
323 try:
324 self._server_monitor_request_id = None
325 self.__parse_update(msg.result, OVSDB_UPDATE,
326 tables=self.server_tables)
327 self.change_seqno = initial_change_seqno
328 if self.__check_server_db():
329 self.__send_monitor_request()
330 self.__send_db_change_aware()
331 else:
332 self.force_reconnect()
333 break
334 except error.Error as e:
335 vlog.err("%s: parse error in received schema: %s"
336 % (self._session.get_name(), e))
337 if self.cluster_id:
338 self.__error()
339 break
340 else:
341 self.change_seqno = initial_change_seqno
342 self.__send_monitor_request()
343 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
344 and self._db_change_aware_request_id is not None
345 and self._db_change_aware_request_id == msg.id):
346 # Reply to us notifying the server of our change awarness.
347 self._db_change_aware_request_id = None
348 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
349 and self._lock_request_id is not None
350 and self._lock_request_id == msg.id):
351 # Reply to our "lock" request.
352 self.__parse_lock_reply(msg.result)
353 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
354 and msg.method == "locked"):
355 # We got our lock.
356 self.__parse_lock_notify(msg.params, True)
357 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
358 and msg.method == "stolen"):
359 # Someone else stole our lock.
360 self.__parse_lock_notify(msg.params, False)
361 elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
362 # Reply to our echo request. Ignore it.
363 pass
364 elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
365 self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
366 self._monitor_request_id == msg.id):
367 if msg.error == "unknown method":
368 self.__send_monitor_request()
369 elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
370 self._server_schema_request_id is not None and
371 self._server_schema_request_id == msg.id):
372 self._server_schema_request_id = None
373 if self.cluster_id:
374 self.force_reconnect()
375 break
376 else:
377 self.change_seqno = initial_change_seqno
378 self.__send_monitor_request()
379 elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
380 ovs.jsonrpc.Message.T_REPLY)
381 and self.__txn_process_reply(msg)):
382 # __txn_process_reply() did everything needed.
383 pass
384 else:
385 # This can happen if a transaction is destroyed before we
386 # receive the reply, so keep the log level low.
387 vlog.dbg("%s: received unexpected %s message"
388 % (self._session.get_name(),
389 ovs.jsonrpc.Message.type_to_string(msg.type)))
390
391 return initial_change_seqno != self.change_seqno
392
393 def send_cond_change(self):
394 if not self._session.is_connected():
395 return
396
397 for table in six.itervalues(self.tables):
398 if table.cond_changed:
399 self.__send_cond_change(table, table.condition)
400 table.cond_changed = False
401
402 def cond_change(self, table_name, cond):
403 """Sets the condition for 'table_name' to 'cond', which should be a
404 conditional expression suitable for use directly in the OVSDB
405 protocol, with the exception that the empty condition []
406 matches no rows (instead of matching every row). That is, []
407 is equivalent to [False], not to [True].
408 """
409
410 table = self.tables.get(table_name)
411 if not table:
412 raise error.Error('Unknown table "%s"' % table_name)
413
414 if cond == []:
415 cond = [False]
416 if table.condition != cond:
417 table.condition = cond
418 table.cond_changed = True
419
420 def wait(self, poller):
421 """Arranges for poller.block() to wake up when self.run() has something
422 to do or when activity occurs on a transaction on 'self'."""
423 self._session.wait(poller)
424 self._session.recv_wait(poller)
425
426 def has_ever_connected(self):
427 """Returns True, if the IDL successfully connected to the remote
428 database and retrieved its contents (even if the connection
429 subsequently dropped and is in the process of reconnecting). If so,
430 then the IDL contains an atomic snapshot of the database's contents
431 (but it might be arbitrarily old if the connection dropped).
432
433 Returns False if the IDL has never connected or retrieved the
434 database's contents. If so, the IDL is empty."""
435 return self.change_seqno != 0
436
437 def force_reconnect(self):
438 """Forces the IDL to drop its connection to the database and reconnect.
439 In the meantime, the contents of the IDL will not change."""
440 self._session.force_reconnect()
441
442 def session_name(self):
443 return self._session.get_name()
444
445 def set_lock(self, lock_name):
446 """If 'lock_name' is not None, configures the IDL to obtain the named
447 lock from the database server and to avoid modifying the database when
448 the lock cannot be acquired (that is, when another client has the same
449 lock).
450
451 If 'lock_name' is None, drops the locking requirement and releases the
452 lock."""
453 assert not self.txn
454 assert not self._outstanding_txns
455
456 if self.lock_name and (not lock_name or lock_name != self.lock_name):
457 # Release previous lock.
458 self.__send_unlock_request()
459 self.lock_name = None
460 self.is_lock_contended = False
461
462 if lock_name and not self.lock_name:
463 # Acquire new lock.
464 self.lock_name = lock_name
465 self.__send_lock_request()
466
467 def notify(self, event, row, updates=None):
468 """Hook for implementing create/update/delete notifications
469
470 :param event: The event that was triggered
471 :type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE
472 :param row: The row as it is after the operation has occured
473 :type row: Row
474 :param updates: For updates, row with only old values of the changed
475 columns
476 :type updates: Row
477 """
478
479 def __send_cond_change(self, table, cond):
480 monitor_cond_change = {table.name: [{"where": cond}]}
481 old_uuid = str(self.uuid)
482 self.uuid = uuid.uuid1()
483 params = [old_uuid, str(self.uuid), monitor_cond_change]
484 msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
485 self._session.send(msg)
486
487 def __clear(self):
488 changed = False
489
490 for table in six.itervalues(self.tables):
491 if table.rows:
492 changed = True
493 table.rows = custom_index.IndexedRows(table)
494
495 if changed:
496 self.change_seqno += 1
497
498 def __update_has_lock(self, new_has_lock):
499 if new_has_lock and not self.has_lock:
500 if self._monitor_request_id is None:
501 self.change_seqno += 1
502 else:
503 # We're waiting for a monitor reply, so don't signal that the
504 # database changed. The monitor reply will increment
505 # change_seqno anyhow.
506 pass
507 self.is_lock_contended = False
508 self.has_lock = new_has_lock
509
510 def __do_send_lock_request(self, method):
511 self.__update_has_lock(False)
512 self._lock_request_id = None
513 if self._session.is_connected():
514 msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
515 msg_id = msg.id
516 self._session.send(msg)
517 else:
518 msg_id = None
519 return msg_id
520
521 def __send_lock_request(self):
522 self._lock_request_id = self.__do_send_lock_request("lock")
523
524 def __send_unlock_request(self):
525 self.__do_send_lock_request("unlock")
526
527 def __parse_lock_reply(self, result):
528 self._lock_request_id = None
529 got_lock = isinstance(result, dict) and result.get("locked") is True
530 self.__update_has_lock(got_lock)
531 if not got_lock:
532 self.is_lock_contended = True
533
534 def __parse_lock_notify(self, params, new_has_lock):
535 if (self.lock_name is not None
536 and isinstance(params, (list, tuple))
537 and params
538 and params[0] == self.lock_name):
539 self.__update_has_lock(new_has_lock)
540 if not new_has_lock:
541 self.is_lock_contended = True
542
543 def __send_db_change_aware(self):
544 msg = ovs.jsonrpc.Message.create_request("set_db_change_aware",
545 [True])
546 self._db_change_aware_request_id = msg.id
547 self._session.send(msg)
548
549 def __send_monitor_request(self):
550 if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED,
551 self.IDL_S_INITIAL]):
552 self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
553 method = "monitor_cond"
554 else:
555 self.state = self.IDL_S_DATA_MONITOR_REQUESTED
556 method = "monitor"
557
558 monitor_requests = {}
559 for table in six.itervalues(self.tables):
560 columns = []
561 for column in six.iterkeys(table.columns):
562 if ((table.name not in self.readonly) or
563 (table.name in self.readonly) and
564 (column not in self.readonly[table.name])):
565 columns.append(column)
566 monitor_request = {"columns": columns}
567 if method == "monitor_cond" and table.condition != [True]:
568 monitor_request["where"] = table.condition
569 table.cond_change = False
570 monitor_requests[table.name] = [monitor_request]
571
572 msg = ovs.jsonrpc.Message.create_request(
573 method, [self._db.name, str(self.uuid), monitor_requests])
574 self._monitor_request_id = msg.id
575 self._session.send(msg)
576
577 def __send_server_schema_request(self):
578 self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
579 msg = ovs.jsonrpc.Message.create_request(
580 "get_schema", [self._server_db_name, str(self.uuid)])
581 self._server_schema_request_id = msg.id
582 self._session.send(msg)
583
584 def __send_server_monitor_request(self):
585 self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
586 monitor_requests = {}
587 table = self.server_tables[self._server_db_table]
588 columns = [column for column in six.iterkeys(table.columns)]
589 for column in six.itervalues(table.columns):
590 if not hasattr(column, 'alert'):
591 column.alert = True
592 table.rows = custom_index.IndexedRows(table)
593 table.need_table = False
594 table.idl = self
595 monitor_request = {"columns": columns}
596 monitor_requests[table.name] = [monitor_request]
597 msg = ovs.jsonrpc.Message.create_request(
598 'monitor', [self._server_db.name,
599 str(self.server_monitor_uuid),
600 monitor_requests])
601 self._server_monitor_request_id = msg.id
602 self._session.send(msg)
603
604 def __parse_update(self, update, version, tables=None):
605 try:
606 if not tables:
607 self.__do_parse_update(update, version, self.tables)
608 else:
609 self.__do_parse_update(update, version, tables)
610 except error.Error as e:
611 vlog.err("%s: error parsing update: %s"
612 % (self._session.get_name(), e))
613
614 def __do_parse_update(self, table_updates, version, tables):
615 if not isinstance(table_updates, dict):
616 raise error.Error("<table-updates> is not an object",
617 table_updates)
618
619 for table_name, table_update in six.iteritems(table_updates):
620 table = tables.get(table_name)
621 if not table:
622 raise error.Error('<table-updates> includes unknown '
623 'table "%s"' % table_name)
624
625 if not isinstance(table_update, dict):
626 raise error.Error('<table-update> for table "%s" is not '
627 'an object' % table_name, table_update)
628
629 for uuid_string, row_update in six.iteritems(table_update):
630 if not ovs.ovsuuid.is_valid_string(uuid_string):
631 raise error.Error('<table-update> for table "%s" '
632 'contains bad UUID "%s" as member '
633 'name' % (table_name, uuid_string),
634 table_update)
635 uuid = ovs.ovsuuid.from_string(uuid_string)
636
637 if not isinstance(row_update, dict):
638 raise error.Error('<table-update> for table "%s" '
639 'contains <row-update> for %s that '
640 'is not an object'
641 % (table_name, uuid_string))
642
643 if version == OVSDB_UPDATE2:
644 if self.__process_update2(table, uuid, row_update):
645 self.change_seqno += 1
646 continue
647
648 parser = ovs.db.parser.Parser(row_update, "row-update")
649 old = parser.get_optional("old", [dict])
650 new = parser.get_optional("new", [dict])
651 parser.finish()
652
653 if not old and not new:
654 raise error.Error('<row-update> missing "old" and '
655 '"new" members', row_update)
656
657 if self.__process_update(table, uuid, old, new):
658 self.change_seqno += 1
659
660 def __process_update2(self, table, uuid, row_update):
661 row = table.rows.get(uuid)
662 changed = False
663 if "delete" in row_update:
664 if row:
665 del table.rows[uuid]
666 self.notify(ROW_DELETE, row)
667 changed = True
668 else:
669 # XXX rate-limit
670 vlog.warn("cannot delete missing row %s from table"
671 "%s" % (uuid, table.name))
672 elif "insert" in row_update or "initial" in row_update:
673 if row:
674 vlog.warn("cannot add existing row %s from table"
675 " %s" % (uuid, table.name))
676 del table.rows[uuid]
677 row = self.__create_row(table, uuid)
678 if "insert" in row_update:
679 row_update = row_update['insert']
680 else:
681 row_update = row_update['initial']
682 self.__add_default(table, row_update)
683 changed = self.__row_update(table, row, row_update)
684 table.rows[uuid] = row
685 if changed:
686 self.notify(ROW_CREATE, row)
687 elif "modify" in row_update:
688 if not row:
689 raise error.Error('Modify non-existing row')
690
691 old_row = self.__apply_diff(table, row, row_update['modify'])
692 self.notify(ROW_UPDATE, row, Row(self, table, uuid, old_row))
693 changed = True
694 else:
695 raise error.Error('<row-update> unknown operation',
696 row_update)
697 return changed
698
699 def __process_update(self, table, uuid, old, new):
700 """Returns True if a column changed, False otherwise."""
701 row = table.rows.get(uuid)
702 changed = False
703 if not new:
704 # Delete row.
705 if row:
706 del table.rows[uuid]
707 changed = True
708 self.notify(ROW_DELETE, row)
709 else:
710 # XXX rate-limit
711 vlog.warn("cannot delete missing row %s from table %s"
712 % (uuid, table.name))
713 elif not old:
714 # Insert row.
715 op = ROW_CREATE
716 if not row:
717 row = self.__create_row(table, uuid)
718 changed = True
719 else:
720 # XXX rate-limit
721 op = ROW_UPDATE
722 vlog.warn("cannot add existing row %s to table %s"
723 % (uuid, table.name))
724 changed |= self.__row_update(table, row, new)
725 if op == ROW_CREATE:
726 table.rows[uuid] = row
727 if changed:
728 self.notify(ROW_CREATE, row)
729 else:
730 op = ROW_UPDATE
731 if not row:
732 row = self.__create_row(table, uuid)
733 changed = True
734 op = ROW_CREATE
735 # XXX rate-limit
736 vlog.warn("cannot modify missing row %s in table %s"
737 % (uuid, table.name))
738 changed |= self.__row_update(table, row, new)
739 if op == ROW_CREATE:
740 table.rows[uuid] = row
741 if changed:
742 self.notify(op, row, Row.from_json(self, table, uuid, old))
743 return changed
744
745 def __check_server_db(self):
746 """Returns True if this is a valid server database, False otherwise."""
747 session_name = self.session_name()
748
749 if self._server_db_table not in self.server_tables:
750 vlog.info("%s: server does not have %s table in its %s database"
751 % (session_name, self._server_db_table,
752 self._server_db_name))
753 return False
754
755 rows = self.server_tables[self._server_db_table].rows
756
757 database = None
758 for row in six.itervalues(rows):
759 if self.cluster_id:
760 if self.cluster_id in \
761 map(lambda x: str(x)[:4], row.cid):
762 database = row
763 break
764 elif row.name == self._db.name:
765 database = row
766 break
767
768 if not database:
769 vlog.info("%s: server does not have %s database"
770 % (session_name, self._db.name))
771 return False
772
773 if (database.model == CLUSTERED and
774 self._session.get_num_of_remotes() > 1):
775 if not database.schema:
776 vlog.info('%s: clustered database server has not yet joined '
777 'cluster; trying another server' % session_name)
778 return False
779 if not database.connected:
780 vlog.info('%s: clustered database server is disconnected '
781 'from cluster; trying another server' % session_name)
782 return False
783 if (self.leader_only and
784 not database.leader):
785 vlog.info('%s: clustered database server is not cluster '
786 'leader; trying another server' % session_name)
787 return False
788 if database.index:
789 if database.index[0] < self._min_index:
790 vlog.warn('%s: clustered database server has stale data; '
791 'trying another server' % session_name)
792 return False
793 self._min_index = database.index[0]
794
795 return True
796
797 def __column_name(self, column):
798 if column.type.key.type == ovs.db.types.UuidType:
799 return ovs.ovsuuid.to_json(column.type.key.type.default)
800 else:
801 return column.type.key.type.default
802
803 def __add_default(self, table, row_update):
804 for column in six.itervalues(table.columns):
805 if column.name not in row_update:
806 if ((table.name not in self.readonly) or
807 (table.name in self.readonly) and
808 (column.name not in self.readonly[table.name])):
809 if column.type.n_min != 0 and not column.type.is_map():
810 row_update[column.name] = self.__column_name(column)
811
812 def __apply_diff(self, table, row, row_diff):
813 old_row = {}
814 for column_name, datum_diff_json in six.iteritems(row_diff):
815 column = table.columns.get(column_name)
816 if not column:
817 # XXX rate-limit
818 vlog.warn("unknown column %s updating table %s"
819 % (column_name, table.name))
820 continue
821
822 try:
823 datum_diff = data.Datum.from_json(column.type, datum_diff_json)
824 except error.Error as e:
825 # XXX rate-limit
826 vlog.warn("error parsing column %s in table %s: %s"
827 % (column_name, table.name, e))
828 continue
829
830 old_row[column_name] = row._data[column_name].copy()
831 datum = row._data[column_name].diff(datum_diff)
832 if datum != row._data[column_name]:
833 row._data[column_name] = datum
834
835 return old_row
836
837 def __row_update(self, table, row, row_json):
838 changed = False
839 for column_name, datum_json in six.iteritems(row_json):
840 column = table.columns.get(column_name)
841 if not column:
842 # XXX rate-limit
843 vlog.warn("unknown column %s updating table %s"
844 % (column_name, table.name))
845 continue
846
847 try:
848 datum = data.Datum.from_json(column.type, datum_json)
849 except error.Error as e:
850 # XXX rate-limit
851 vlog.warn("error parsing column %s in table %s: %s"
852 % (column_name, table.name, e))
853 continue
854
855 if datum != row._data[column_name]:
856 row._data[column_name] = datum
857 if column.alert:
858 changed = True
859 else:
860 # Didn't really change but the OVSDB monitor protocol always
861 # includes every value in a row.
862 pass
863 return changed
864
865 def __create_row(self, table, uuid):
866 data = {}
867 for column in six.itervalues(table.columns):
868 data[column.name] = ovs.db.data.Datum.default(column.type)
869 return Row(self, table, uuid, data)
870
871 def __error(self):
872 self._session.force_reconnect()
873
874 def __txn_abort_all(self):
875 while self._outstanding_txns:
876 txn = self._outstanding_txns.popitem()[1]
877 txn._status = Transaction.TRY_AGAIN
878
879 def __txn_process_reply(self, msg):
880 txn = self._outstanding_txns.pop(msg.id, None)
881 if txn:
882 txn._process_reply(msg)
883 return True
884
885
886 def _uuid_to_row(atom, base):
887 if base.ref_table:
888 return base.ref_table.rows.get(atom)
889 else:
890 return atom
891
892
893 def _row_to_uuid(value):
894 if isinstance(value, Row):
895 return value.uuid
896 else:
897 return value
898
899
900 @functools.total_ordering
901 class Row(object):
902 """A row within an IDL.
903
904 The client may access the following attributes directly:
905
906 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
907
908 - An attribute for each column in the Row's table, named for the column,
909 whose values are as returned by Datum.to_python() for the column's type.
910
911 If some error occurs (e.g. the database server's idea of the column is
912 different from the IDL's idea), then the attribute values is the
913 "default" value return by Datum.default() for the column's type. (It is
914 important to know this because the default value may violate constraints
915 for the column's type, e.g. the default integer value is 0 even if column
916 contraints require the column's value to be positive.)
917
918 When a transaction is active, column attributes may also be assigned new
919 values. Committing the transaction will then cause the new value to be
920 stored into the database.
921
922 *NOTE*: In the current implementation, the value of a column is a *copy*
923 of the value in the database. This means that modifying its value
924 directly will have no useful effect. For example, the following:
925 row.mycolumn["a"] = "b" # don't do this
926 will not change anything in the database, even after commit. To modify
927 the column, instead assign the modified column value back to the column:
928 d = row.mycolumn
929 d["a"] = "b"
930 row.mycolumn = d
931 """
932 def __init__(self, idl, table, uuid, data):
933 # All of the explicit references to self.__dict__ below are required
934 # to set real attributes with invoking self.__getattr__().
935 self.__dict__["uuid"] = uuid
936
937 self.__dict__["_idl"] = idl
938 self.__dict__["_table"] = table
939
940 # _data is the committed data. It takes the following values:
941 #
942 # - A dictionary that maps every column name to a Datum, if the row
943 # exists in the committed form of the database.
944 #
945 # - None, if this row is newly inserted within the active transaction
946 # and thus has no committed form.
947 self.__dict__["_data"] = data
948
949 # _changes describes changes to this row within the active transaction.
950 # It takes the following values:
951 #
952 # - {}, the empty dictionary, if no transaction is active or if the
953 # row has yet not been changed within this transaction.
954 #
955 # - A dictionary that maps a column name to its new Datum, if an
956 # active transaction changes those columns' values.
957 #
958 # - A dictionary that maps every column name to a Datum, if the row
959 # is newly inserted within the active transaction.
960 #
961 # - None, if this transaction deletes this row.
962 self.__dict__["_changes"] = {}
963
964 # _mutations describes changes to this row to be handled via a
965 # mutate operation on the wire. It takes the following values:
966 #
967 # - {}, the empty dictionary, if no transaction is active or if the
968 # row has yet not been mutated within this transaction.
969 #
970 # - A dictionary that contains two keys:
971 #
972 # - "_inserts" contains a dictionary that maps column names to
973 # new keys/key-value pairs that should be inserted into the
974 # column
975 # - "_removes" contains a dictionary that maps column names to
976 # the keys/key-value pairs that should be removed from the
977 # column
978 #
979 # - None, if this transaction deletes this row.
980 self.__dict__["_mutations"] = {}
981
982 # A dictionary whose keys are the names of columns that must be
983 # verified as prerequisites when the transaction commits. The values
984 # in the dictionary are all None.
985 self.__dict__["_prereqs"] = {}
986
987 def __lt__(self, other):
988 if not isinstance(other, Row):
989 return NotImplemented
990 return bool(self.__dict__['uuid'] < other.__dict__['uuid'])
991
992 def __eq__(self, other):
993 if not isinstance(other, Row):
994 return NotImplemented
995 return bool(self.__dict__['uuid'] == other.__dict__['uuid'])
996
997 def __hash__(self):
998 return int(self.__dict__['uuid'])
999
1000 def __str__(self):
1001 return "{table}({data})".format(
1002 table=self._table.name,
1003 data=", ".join("{col}={val}".format(col=c, val=getattr(self, c))
1004 for c in sorted(self._table.columns)))
1005
1006 def __getattr__(self, column_name):
1007 assert self._changes is not None
1008 assert self._mutations is not None
1009
1010 try:
1011 column = self._table.columns[column_name]
1012 except KeyError:
1013 raise AttributeError("%s instance has no attribute '%s'" %
1014 (self.__class__.__name__, column_name))
1015 datum = self._changes.get(column_name)
1016 inserts = None
1017 if '_inserts' in self._mutations.keys():
1018 inserts = self._mutations['_inserts'].get(column_name)
1019 removes = None
1020 if '_removes' in self._mutations.keys():
1021 removes = self._mutations['_removes'].get(column_name)
1022 if datum is None:
1023 if self._data is None:
1024 if inserts is None:
1025 raise AttributeError("%s instance has no attribute '%s'" %
1026 (self.__class__.__name__,
1027 column_name))
1028 else:
1029 datum = data.Datum.from_python(column.type,
1030 inserts,
1031 _row_to_uuid)
1032 elif column_name in self._data:
1033 datum = self._data[column_name]
1034 if column.type.is_set():
1035 dlist = datum.as_list()
1036 if inserts is not None:
1037 dlist.extend(list(inserts))
1038 if removes is not None:
1039 removes_datum = data.Datum.from_python(column.type,
1040 removes,
1041 _row_to_uuid)
1042 removes_list = removes_datum.as_list()
1043 dlist = [x for x in dlist if x not in removes_list]
1044 datum = data.Datum.from_python(column.type, dlist,
1045 _row_to_uuid)
1046 elif column.type.is_map():
1047 dmap = datum.to_python(_uuid_to_row)
1048 if inserts is not None:
1049 dmap.update(inserts)
1050 if removes is not None:
1051 for key in removes:
1052 if key not in (inserts or {}):
1053 dmap.pop(key, None)
1054 datum = data.Datum.from_python(column.type, dmap,
1055 _row_to_uuid)
1056 else:
1057 if inserts is None:
1058 raise AttributeError("%s instance has no attribute '%s'" %
1059 (self.__class__.__name__,
1060 column_name))
1061 else:
1062 datum = inserts
1063
1064 return datum.to_python(_uuid_to_row)
1065
1066 def __setattr__(self, column_name, value):
1067 assert self._changes is not None
1068 assert self._idl.txn
1069
1070 if ((self._table.name in self._idl.readonly) and
1071 (column_name in self._idl.readonly[self._table.name])):
1072 vlog.warn("attempting to write to readonly column %s"
1073 % column_name)
1074 return
1075
1076 column = self._table.columns[column_name]
1077 try:
1078 datum = data.Datum.from_python(column.type, value, _row_to_uuid)
1079 except error.Error as e:
1080 # XXX rate-limit
1081 vlog.err("attempting to write bad value to column %s (%s)"
1082 % (column_name, e))
1083 return
1084 # Remove prior version of the Row from the index if it has the indexed
1085 # column set, and the column changing is an indexed column
1086 if hasattr(self, column_name):
1087 for idx in self._table.rows.indexes.values():
1088 if column_name in (c.column for c in idx.columns):
1089 idx.remove(self)
1090 self._idl.txn._write(self, column, datum)
1091 for idx in self._table.rows.indexes.values():
1092 # Only update the index if indexed columns change
1093 if column_name in (c.column for c in idx.columns):
1094 idx.add(self)
1095
1096 def addvalue(self, column_name, key):
1097 self._idl.txn._txn_rows[self.uuid] = self
1098 column = self._table.columns[column_name]
1099 try:
1100 data.Datum.from_python(column.type, key, _row_to_uuid)
1101 except error.Error as e:
1102 # XXX rate-limit
1103 vlog.err("attempting to write bad value to column %s (%s)"
1104 % (column_name, e))
1105 return
1106 inserts = self._mutations.setdefault('_inserts', {})
1107 column_value = inserts.setdefault(column_name, set())
1108 column_value.add(key)
1109
1110 def delvalue(self, column_name, key):
1111 self._idl.txn._txn_rows[self.uuid] = self
1112 column = self._table.columns[column_name]
1113 try:
1114 data.Datum.from_python(column.type, key, _row_to_uuid)
1115 except error.Error as e:
1116 # XXX rate-limit
1117 vlog.err("attempting to delete bad value from column %s (%s)"
1118 % (column_name, e))
1119 return
1120 removes = self._mutations.setdefault('_removes', {})
1121 column_value = removes.setdefault(column_name, set())
1122 column_value.add(key)
1123
1124 def setkey(self, column_name, key, value):
1125 self._idl.txn._txn_rows[self.uuid] = self
1126 column = self._table.columns[column_name]
1127 try:
1128 data.Datum.from_python(column.type, {key: value}, _row_to_uuid)
1129 except error.Error as e:
1130 # XXX rate-limit
1131 vlog.err("attempting to write bad value to column %s (%s)"
1132 % (column_name, e))
1133 return
1134 if self._data and column_name in self._data:
1135 # Remove existing key/value before updating.
1136 removes = self._mutations.setdefault('_removes', {})
1137 column_value = removes.setdefault(column_name, set())
1138 column_value.add(key)
1139 inserts = self._mutations.setdefault('_inserts', {})
1140 column_value = inserts.setdefault(column_name, {})
1141 column_value[key] = value
1142
1143 def delkey(self, column_name, key, value=None):
1144 self._idl.txn._txn_rows[self.uuid] = self
1145 if value:
1146 try:
1147 old_value = data.Datum.to_python(self._data[column_name],
1148 _uuid_to_row)
1149 except error.Error:
1150 return
1151 if key not in old_value:
1152 return
1153 if old_value[key] != value:
1154 return
1155 removes = self._mutations.setdefault('_removes', {})
1156 column_value = removes.setdefault(column_name, set())
1157 column_value.add(key)
1158 return
1159
1160 @classmethod
1161 def from_json(cls, idl, table, uuid, row_json):
1162 data = {}
1163 for column_name, datum_json in six.iteritems(row_json):
1164 column = table.columns.get(column_name)
1165 if not column:
1166 # XXX rate-limit
1167 vlog.warn("unknown column %s in table %s"
1168 % (column_name, table.name))
1169 continue
1170 try:
1171 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
1172 except error.Error as e:
1173 # XXX rate-limit
1174 vlog.warn("error parsing column %s in table %s: %s"
1175 % (column_name, table.name, e))
1176 continue
1177 data[column_name] = datum
1178 return cls(idl, table, uuid, data)
1179
1180 def verify(self, column_name):
1181 """Causes the original contents of column 'column_name' in this row to
1182 be verified as a prerequisite to completing the transaction. That is,
1183 if 'column_name' changed in this row (or if this row was deleted)
1184 between the time that the IDL originally read its contents and the time
1185 that the transaction commits, then the transaction aborts and
1186 Transaction.commit() returns Transaction.TRY_AGAIN.
1187
1188 The intention is that, to ensure that no transaction commits based on
1189 dirty reads, an application should call Row.verify() on each data item
1190 read as part of a read-modify-write operation.
1191
1192 In some cases Row.verify() reduces to a no-op, because the current
1193 value of the column is already known:
1194
1195 - If this row is a row created by the current transaction (returned
1196 by Transaction.insert()).
1197
1198 - If the column has already been modified within the current
1199 transaction.
1200
1201 Because of the latter property, always call Row.verify() *before*
1202 modifying the column, for a given read-modify-write.
1203
1204 A transaction must be in progress."""
1205 assert self._idl.txn
1206 assert self._changes is not None
1207 if not self._data or column_name in self._changes:
1208 return
1209
1210 self._prereqs[column_name] = None
1211
1212 def delete(self):
1213 """Deletes this row from its table.
1214
1215 A transaction must be in progress."""
1216 assert self._idl.txn
1217 assert self._changes is not None
1218 if self._data is None:
1219 del self._idl.txn._txn_rows[self.uuid]
1220 else:
1221 self._idl.txn._txn_rows[self.uuid] = self
1222 del self._table.rows[self.uuid]
1223 self.__dict__["_changes"] = None
1224
1225 def fetch(self, column_name):
1226 self._idl.txn._fetch(self, column_name)
1227
1228 def increment(self, column_name):
1229 """Causes the transaction, when committed, to increment the value of
1230 'column_name' within this row by 1. 'column_name' must have an integer
1231 type. After the transaction commits successfully, the client may
1232 retrieve the final (incremented) value of 'column_name' with
1233 Transaction.get_increment_new_value().
1234
1235 The client could accomplish something similar by reading and writing
1236 and verify()ing columns. However, increment() will never (by itself)
1237 cause a transaction to fail because of a verify error.
1238
1239 The intended use is for incrementing the "next_cfg" column in
1240 the Open_vSwitch table."""
1241 self._idl.txn._increment(self, column_name)
1242
1243
1244 def _uuid_name_from_uuid(uuid):
1245 return "row%s" % str(uuid).replace("-", "_")
1246
1247
1248 def _where_uuid_equals(uuid):
1249 return [["_uuid", "==", ["uuid", str(uuid)]]]
1250
1251
1252 class _InsertedRow(object):
1253 def __init__(self, op_index):
1254 self.op_index = op_index
1255 self.real = None
1256
1257
1258 class Transaction(object):
1259 """A transaction may modify the contents of a database by modifying the
1260 values of columns, deleting rows, inserting rows, or adding checks that
1261 columns in the database have not changed ("verify" operations), through
1262 Row methods.
1263
1264 Reading and writing columns and inserting and deleting rows are all
1265 straightforward. The reasons to verify columns are less obvious.
1266 Verification is the key to maintaining transactional integrity. Because
1267 OVSDB handles multiple clients, it can happen that between the time that
1268 OVSDB client A reads a column and writes a new value, OVSDB client B has
1269 written that column. Client A's write should not ordinarily overwrite
1270 client B's, especially if the column in question is a "map" column that
1271 contains several more or less independent data items. If client A adds a
1272 "verify" operation before it writes the column, then the transaction fails
1273 in case client B modifies it first. Client A will then see the new value
1274 of the column and compose a new transaction based on the new contents
1275 written by client B.
1276
1277 When a transaction is complete, which must be before the next call to
1278 Idl.run(), call Transaction.commit() or Transaction.abort().
1279
1280 The life-cycle of a transaction looks like this:
1281
1282 1. Create the transaction and record the initial sequence number:
1283
1284 seqno = idl.change_seqno(idl)
1285 txn = Transaction(idl)
1286
1287 2. Modify the database with Row and Transaction methods.
1288
1289 3. Commit the transaction by calling Transaction.commit(). The first call
1290 to this function probably returns Transaction.INCOMPLETE. The client
1291 must keep calling again along as this remains true, calling Idl.run() in
1292 between to let the IDL do protocol processing. (If the client doesn't
1293 have anything else to do in the meantime, it can use
1294 Transaction.commit_block() to avoid having to loop itself.)
1295
1296 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
1297 to change from the saved 'seqno' (it's possible that it's already
1298 changed, in which case the client should not wait at all), then start
1299 over from step 1. Only a call to Idl.run() will change the return value
1300 of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)"""
1301
1302 # Status values that Transaction.commit() can return.
1303
1304 # Not yet committed or aborted.
1305 UNCOMMITTED = "uncommitted"
1306 # Transaction didn't include any changes.
1307 UNCHANGED = "unchanged"
1308 # Commit in progress, please wait.
1309 INCOMPLETE = "incomplete"
1310 # ovsdb_idl_txn_abort() called.
1311 ABORTED = "aborted"
1312 # Commit successful.
1313 SUCCESS = "success"
1314 # Commit failed because a "verify" operation
1315 # reported an inconsistency, due to a network
1316 # problem, or other transient failure. Wait
1317 # for a change, then try again.
1318 TRY_AGAIN = "try again"
1319 # Server hasn't given us the lock yet.
1320 NOT_LOCKED = "not locked"
1321 # Commit failed due to a hard error.
1322 ERROR = "error"
1323
1324 @staticmethod
1325 def status_to_string(status):
1326 """Converts one of the status values that Transaction.commit() can
1327 return into a human-readable string.
1328
1329 (The status values are in fact such strings already, so
1330 there's nothing to do.)"""
1331 return status
1332
1333 def __init__(self, idl):
1334 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
1335 A given Idl may only have a single active transaction at a time.
1336
1337 A Transaction may modify the contents of a database by assigning new
1338 values to columns (attributes of Row), deleting rows (with
1339 Row.delete()), or inserting rows (with Transaction.insert()). It may
1340 also check that columns in the database have not changed with
1341 Row.verify().
1342
1343 When a transaction is complete (which must be before the next call to
1344 Idl.run()), call Transaction.commit() or Transaction.abort()."""
1345 assert idl.txn is None
1346
1347 idl.txn = self
1348 self._request_id = None
1349 self.idl = idl
1350 self.dry_run = False
1351 self._txn_rows = {}
1352 self._status = Transaction.UNCOMMITTED
1353 self._error = None
1354 self._comments = []
1355
1356 self._inc_row = None
1357 self._inc_column = None
1358
1359 self._fetch_requests = []
1360
1361 self._inserted_rows = {} # Map from UUID to _InsertedRow
1362
1363 def add_comment(self, comment):
1364 """Appends 'comment' to the comments that will be passed to the OVSDB
1365 server when this transaction is committed. (The comment will be
1366 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
1367 relatively human-readable form.)"""
1368 self._comments.append(comment)
1369
1370 def wait(self, poller):
1371 """Causes poll_block() to wake up if this transaction has completed
1372 committing."""
1373 if self._status not in (Transaction.UNCOMMITTED,
1374 Transaction.INCOMPLETE):
1375 poller.immediate_wake()
1376
1377 def _substitute_uuids(self, json):
1378 if isinstance(json, (list, tuple)):
1379 if (len(json) == 2
1380 and json[0] == 'uuid'
1381 and ovs.ovsuuid.is_valid_string(json[1])):
1382 uuid = ovs.ovsuuid.from_string(json[1])
1383 row = self._txn_rows.get(uuid, None)
1384 if row and row._data is None:
1385 return ["named-uuid", _uuid_name_from_uuid(uuid)]
1386 else:
1387 return [self._substitute_uuids(elem) for elem in json]
1388 return json
1389
1390 def __disassemble(self):
1391 self.idl.txn = None
1392
1393 for row in six.itervalues(self._txn_rows):
1394 if row._changes is None:
1395 # If we add the deleted row back to rows with _changes == None
1396 # then __getattr__ will not work for the indexes
1397 row.__dict__["_changes"] = {}
1398 row.__dict__["_mutations"] = {}
1399 row._table.rows[row.uuid] = row
1400 elif row._data is None:
1401 del row._table.rows[row.uuid]
1402 row.__dict__["_changes"] = {}
1403 row.__dict__["_mutations"] = {}
1404 row.__dict__["_prereqs"] = {}
1405 self._txn_rows = {}
1406
1407 def commit(self):
1408 """Attempts to commit 'txn'. Returns the status of the commit
1409 operation, one of the following constants:
1410
1411 Transaction.INCOMPLETE:
1412
1413 The transaction is in progress, but not yet complete. The caller
1414 should call again later, after calling Idl.run() to let the
1415 IDL do OVSDB protocol processing.
1416
1417 Transaction.UNCHANGED:
1418
1419 The transaction is complete. (It didn't actually change the
1420 database, so the IDL didn't send any request to the database
1421 server.)
1422
1423 Transaction.ABORTED:
1424
1425 The caller previously called Transaction.abort().
1426
1427 Transaction.SUCCESS:
1428
1429 The transaction was successful. The update made by the
1430 transaction (and possibly other changes made by other database
1431 clients) should already be visible in the IDL.
1432
1433 Transaction.TRY_AGAIN:
1434
1435 The transaction failed for some transient reason, e.g. because a
1436 "verify" operation reported an inconsistency or due to a network
1437 problem. The caller should wait for a change to the database,
1438 then compose a new transaction, and commit the new transaction.
1439
1440 Use Idl.change_seqno to wait for a change in the database. It is
1441 important to use its value *before* the initial call to
1442 Transaction.commit() as the baseline for this purpose, because
1443 the change that one should wait for can happen after the initial
1444 call but before the call that returns Transaction.TRY_AGAIN, and
1445 using some other baseline value in that situation could cause an
1446 indefinite wait if the database rarely changes.
1447
1448 Transaction.NOT_LOCKED:
1449
1450 The transaction failed because the IDL has been configured to
1451 require a database lock (with Idl.set_lock()) but didn't
1452 get it yet or has already lost it.
1453
1454 Committing a transaction rolls back all of the changes that it made to
1455 the IDL's copy of the database. If the transaction commits
1456 successfully, then the database server will send an update and, thus,
1457 the IDL will be updated with the committed changes."""
1458 # The status can only change if we're the active transaction.
1459 # (Otherwise, our status will change only in Idl.run().)
1460 if self != self.idl.txn:
1461 return self._status
1462
1463 # If we need a lock but don't have it, give up quickly.
1464 if self.idl.lock_name and not self.idl.has_lock:
1465 self._status = Transaction.NOT_LOCKED
1466 self.__disassemble()
1467 return self._status
1468
1469 operations = [self.idl._db.name]
1470
1471 # Assert that we have the required lock (avoiding a race).
1472 if self.idl.lock_name:
1473 operations.append({"op": "assert",
1474 "lock": self.idl.lock_name})
1475
1476 # Add prerequisites and declarations of new rows.
1477 for row in six.itervalues(self._txn_rows):
1478 if row._prereqs:
1479 rows = {}
1480 columns = []
1481 for column_name in row._prereqs:
1482 columns.append(column_name)
1483 rows[column_name] = row._data[column_name].to_json()
1484 operations.append({"op": "wait",
1485 "table": row._table.name,
1486 "timeout": 0,
1487 "where": _where_uuid_equals(row.uuid),
1488 "until": "==",
1489 "columns": columns,
1490 "rows": [rows]})
1491
1492 # Add updates.
1493 any_updates = False
1494 for row in six.itervalues(self._txn_rows):
1495 if row._changes is None:
1496 if row._table.is_root:
1497 operations.append({"op": "delete",
1498 "table": row._table.name,
1499 "where": _where_uuid_equals(row.uuid)})
1500 any_updates = True
1501 else:
1502 # Let ovsdb-server decide whether to really delete it.
1503 pass
1504 elif row._changes:
1505 op = {"table": row._table.name}
1506 if row._data is None:
1507 op["op"] = "insert"
1508 op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
1509 any_updates = True
1510
1511 op_index = len(operations) - 1
1512 self._inserted_rows[row.uuid] = _InsertedRow(op_index)
1513 else:
1514 op["op"] = "update"
1515 op["where"] = _where_uuid_equals(row.uuid)
1516
1517 row_json = {}
1518 op["row"] = row_json
1519
1520 for column_name, datum in six.iteritems(row._changes):
1521 if row._data is not None or not datum.is_default():
1522 row_json[column_name] = (
1523 self._substitute_uuids(datum.to_json()))
1524
1525 # If anything really changed, consider it an update.
1526 # We can't suppress not-really-changed values earlier
1527 # or transactions would become nonatomic (see the big
1528 # comment inside Transaction._write()).
1529 if (not any_updates and row._data is not None and
1530 row._data[column_name] != datum):
1531 any_updates = True
1532
1533 if row._data is None or row_json:
1534 operations.append(op)
1535 if row._mutations:
1536 addop = False
1537 op = {"table": row._table.name}
1538 op["op"] = "mutate"
1539 if row._data is None:
1540 # New row
1541 op["where"] = self._substitute_uuids(
1542 _where_uuid_equals(row.uuid))
1543 else:
1544 # Existing row
1545 op["where"] = _where_uuid_equals(row.uuid)
1546 op["mutations"] = []
1547 if '_removes' in row._mutations.keys():
1548 for col, dat in six.iteritems(row._mutations['_removes']):
1549 column = row._table.columns[col]
1550 if column.type.is_map():
1551 opdat = ["set"]
1552 opdat.append(list(dat))
1553 else:
1554 opdat = ["set"]
1555 inner_opdat = []
1556 for ele in dat:
1557 try:
1558 datum = data.Datum.from_python(column.type,
1559 ele, _row_to_uuid)
1560 except error.Error:
1561 return
1562 inner_opdat.append(
1563 self._substitute_uuids(datum.to_json()))
1564 opdat.append(inner_opdat)
1565 mutation = [col, "delete", opdat]
1566 op["mutations"].append(mutation)
1567 addop = True
1568 if '_inserts' in row._mutations.keys():
1569 for col, val in six.iteritems(row._mutations['_inserts']):
1570 column = row._table.columns[col]
1571 if column.type.is_map():
1572 opdat = ["map"]
1573 datum = data.Datum.from_python(column.type, val,
1574 _row_to_uuid)
1575 opdat.append(datum.as_list())
1576 else:
1577 opdat = ["set"]
1578 inner_opdat = []
1579 for ele in val:
1580 try:
1581 datum = data.Datum.from_python(column.type,
1582 ele, _row_to_uuid)
1583 except error.Error:
1584 return
1585 inner_opdat.append(
1586 self._substitute_uuids(datum.to_json()))
1587 opdat.append(inner_opdat)
1588 mutation = [col, "insert", opdat]
1589 op["mutations"].append(mutation)
1590 addop = True
1591 if addop:
1592 operations.append(op)
1593 any_updates = True
1594
1595 if self._fetch_requests:
1596 for fetch in self._fetch_requests:
1597 fetch["index"] = len(operations) - 1
1598 operations.append({"op": "select",
1599 "table": fetch["row"]._table.name,
1600 "where": self._substitute_uuids(
1601 _where_uuid_equals(fetch["row"].uuid)),
1602 "columns": [fetch["column_name"]]})
1603 any_updates = True
1604
1605 # Add increment.
1606 if self._inc_row and any_updates:
1607 self._inc_index = len(operations) - 1
1608
1609 operations.append({"op": "mutate",
1610 "table": self._inc_row._table.name,
1611 "where": self._substitute_uuids(
1612 _where_uuid_equals(self._inc_row.uuid)),
1613 "mutations": [[self._inc_column, "+=", 1]]})
1614 operations.append({"op": "select",
1615 "table": self._inc_row._table.name,
1616 "where": self._substitute_uuids(
1617 _where_uuid_equals(self._inc_row.uuid)),
1618 "columns": [self._inc_column]})
1619
1620 # Add comment.
1621 if self._comments:
1622 operations.append({"op": "comment",
1623 "comment": "\n".join(self._comments)})
1624
1625 # Dry run?
1626 if self.dry_run:
1627 operations.append({"op": "abort"})
1628
1629 if not any_updates:
1630 self._status = Transaction.UNCHANGED
1631 else:
1632 msg = ovs.jsonrpc.Message.create_request("transact", operations)
1633 self._request_id = msg.id
1634 if not self.idl._session.send(msg):
1635 self.idl._outstanding_txns[self._request_id] = self
1636 self._status = Transaction.INCOMPLETE
1637 else:
1638 self._status = Transaction.TRY_AGAIN
1639
1640 self.__disassemble()
1641 return self._status
1642
1643 def commit_block(self):
1644 """Attempts to commit this transaction, blocking until the commit
1645 either succeeds or fails. Returns the final commit status, which may
1646 be any Transaction.* value other than Transaction.INCOMPLETE.
1647
1648 This function calls Idl.run() on this transaction'ss IDL, so it may
1649 cause Idl.change_seqno to change."""
1650 while True:
1651 status = self.commit()
1652 if status != Transaction.INCOMPLETE:
1653 return status
1654
1655 self.idl.run()
1656
1657 poller = ovs.poller.Poller()
1658 self.idl.wait(poller)
1659 self.wait(poller)
1660 poller.block()
1661
1662 def get_increment_new_value(self):
1663 """Returns the final (incremented) value of the column in this
1664 transaction that was set to be incremented by Row.increment. This
1665 transaction must have committed successfully."""
1666 assert self._status == Transaction.SUCCESS
1667 return self._inc_new_value
1668
1669 def abort(self):
1670 """Aborts this transaction. If Transaction.commit() has already been
1671 called then the transaction might get committed anyhow."""
1672 self.__disassemble()
1673 if self._status in (Transaction.UNCOMMITTED,
1674 Transaction.INCOMPLETE):
1675 self._status = Transaction.ABORTED
1676
1677 def get_error(self):
1678 """Returns a string representing this transaction's current status,
1679 suitable for use in log messages."""
1680 if self._status != Transaction.ERROR:
1681 return Transaction.status_to_string(self._status)
1682 elif self._error:
1683 return self._error
1684 else:
1685 return "no error details available"
1686
1687 def __set_error_json(self, json):
1688 if self._error is None:
1689 self._error = ovs.json.to_string(json)
1690
1691 def get_insert_uuid(self, uuid):
1692 """Finds and returns the permanent UUID that the database assigned to a
1693 newly inserted row, given the UUID that Transaction.insert() assigned
1694 locally to that row.
1695
1696 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1697 or if it was assigned by that function and then deleted by Row.delete()
1698 within the same transaction. (Rows that are inserted and then deleted
1699 within a single transaction are never sent to the database server, so
1700 it never assigns them a permanent UUID.)
1701
1702 This transaction must have completed successfully."""
1703 assert self._status in (Transaction.SUCCESS,
1704 Transaction.UNCHANGED)
1705 inserted_row = self._inserted_rows.get(uuid)
1706 if inserted_row:
1707 return inserted_row.real
1708 return None
1709
1710 def _increment(self, row, column):
1711 assert not self._inc_row
1712 self._inc_row = row
1713 self._inc_column = column
1714
1715 def _fetch(self, row, column_name):
1716 self._fetch_requests.append({"row": row, "column_name": column_name})
1717
1718 def _write(self, row, column, datum):
1719 assert row._changes is not None
1720 assert row._mutations is not None
1721
1722 txn = row._idl.txn
1723
1724 # If this is a write-only column and the datum being written is the
1725 # same as the one already there, just skip the update entirely. This
1726 # is worth optimizing because we have a lot of columns that get
1727 # periodically refreshed into the database but don't actually change
1728 # that often.
1729 #
1730 # We don't do this for read/write columns because that would break
1731 # atomicity of transactions--some other client might have written a
1732 # different value in that column since we read it. (But if a whole
1733 # transaction only does writes of existing values, without making any
1734 # real changes, we will drop the whole transaction later in
1735 # ovsdb_idl_txn_commit().)
1736 if (not column.alert and row._data and
1737 row._data.get(column.name) == datum):
1738 new_value = row._changes.get(column.name)
1739 if new_value is None or new_value == datum:
1740 return
1741
1742 txn._txn_rows[row.uuid] = row
1743 if '_inserts' in row._mutations:
1744 row._mutations['_inserts'].pop(column.name, None)
1745 if '_removes' in row._mutations:
1746 row._mutations['_removes'].pop(column.name, None)
1747 row._changes[column.name] = datum.copy()
1748
1749 def insert(self, table, new_uuid=None):
1750 """Inserts and returns a new row in 'table', which must be one of the
1751 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1752
1753 The new row is assigned a provisional UUID. If 'uuid' is None then one
1754 is randomly generated; otherwise 'uuid' should specify a randomly
1755 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
1756 different UUID when 'txn' is committed, but the IDL will replace any
1757 uses of the provisional UUID in the data to be to be committed by the
1758 UUID assigned by ovsdb-server."""
1759 assert self._status == Transaction.UNCOMMITTED
1760 if new_uuid is None:
1761 new_uuid = uuid.uuid4()
1762 row = Row(self.idl, table, new_uuid, None)
1763 table.rows[row.uuid] = row
1764 self._txn_rows[row.uuid] = row
1765 return row
1766
1767 def _process_reply(self, msg):
1768 if msg.type == ovs.jsonrpc.Message.T_ERROR:
1769 self._status = Transaction.ERROR
1770 elif not isinstance(msg.result, (list, tuple)):
1771 # XXX rate-limit
1772 vlog.warn('reply to "transact" is not JSON array')
1773 else:
1774 hard_errors = False
1775 soft_errors = False
1776 lock_errors = False
1777
1778 ops = msg.result
1779 for op in ops:
1780 if op is None:
1781 # This isn't an error in itself but indicates that some
1782 # prior operation failed, so make sure that we know about
1783 # it.
1784 soft_errors = True
1785 elif isinstance(op, dict):
1786 error = op.get("error")
1787 if error is not None:
1788 if error == "timed out":
1789 soft_errors = True
1790 elif error == "not owner":
1791 lock_errors = True
1792 elif error == "aborted":
1793 pass
1794 else:
1795 hard_errors = True
1796 self.__set_error_json(op)
1797 else:
1798 hard_errors = True
1799 self.__set_error_json(op)
1800 # XXX rate-limit
1801 vlog.warn("operation reply is not JSON null or object")
1802
1803 if not soft_errors and not hard_errors and not lock_errors:
1804 if self._inc_row and not self.__process_inc_reply(ops):
1805 hard_errors = True
1806 if self._fetch_requests:
1807 if self.__process_fetch_reply(ops):
1808 self.idl.change_seqno += 1
1809 else:
1810 hard_errors = True
1811
1812 for insert in six.itervalues(self._inserted_rows):
1813 if not self.__process_insert_reply(insert, ops):
1814 hard_errors = True
1815
1816 if hard_errors:
1817 self._status = Transaction.ERROR
1818 elif lock_errors:
1819 self._status = Transaction.NOT_LOCKED
1820 elif soft_errors:
1821 self._status = Transaction.TRY_AGAIN
1822 else:
1823 self._status = Transaction.SUCCESS
1824
1825 @staticmethod
1826 def __check_json_type(json, types, name):
1827 if not json:
1828 # XXX rate-limit
1829 vlog.warn("%s is missing" % name)
1830 return False
1831 elif not isinstance(json, tuple(types)):
1832 # XXX rate-limit
1833 vlog.warn("%s has unexpected type %s" % (name, type(json)))
1834 return False
1835 else:
1836 return True
1837
1838 def __process_fetch_reply(self, ops):
1839 update = False
1840 for fetch_request in self._fetch_requests:
1841 row = fetch_request["row"]
1842 column_name = fetch_request["column_name"]
1843 index = fetch_request["index"]
1844 table = row._table
1845
1846 select = ops[index]
1847 fetched_rows = select.get("rows")
1848 if not Transaction.__check_json_type(fetched_rows, (list, tuple),
1849 '"select" reply "rows"'):
1850 return False
1851 if len(fetched_rows) != 1:
1852 # XXX rate-limit
1853 vlog.warn('"select" reply "rows" has %d elements '
1854 'instead of 1' % len(fetched_rows))
1855 continue
1856 fetched_row = fetched_rows[0]
1857 if not Transaction.__check_json_type(fetched_row, (dict,),
1858 '"select" reply row'):
1859 continue
1860
1861 column = table.columns.get(column_name)
1862 datum_json = fetched_row.get(column_name)
1863 datum = data.Datum.from_json(column.type, datum_json)
1864
1865 row._data[column_name] = datum
1866 update = True
1867
1868 return update
1869
1870 def __process_inc_reply(self, ops):
1871 if self._inc_index + 2 > len(ops):
1872 # XXX rate-limit
1873 vlog.warn("reply does not contain enough operations for "
1874 "increment (has %d, needs %d)" %
1875 (len(ops), self._inc_index + 2))
1876
1877 # We know that this is a JSON object because the loop in
1878 # __process_reply() already checked.
1879 mutate = ops[self._inc_index]
1880 count = mutate.get("count")
1881 if not Transaction.__check_json_type(count, six.integer_types,
1882 '"mutate" reply "count"'):
1883 return False
1884 if count != 1:
1885 # XXX rate-limit
1886 vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1887 return False
1888
1889 select = ops[self._inc_index + 1]
1890 rows = select.get("rows")
1891 if not Transaction.__check_json_type(rows, (list, tuple),
1892 '"select" reply "rows"'):
1893 return False
1894 if len(rows) != 1:
1895 # XXX rate-limit
1896 vlog.warn('"select" reply "rows" has %d elements '
1897 'instead of 1' % len(rows))
1898 return False
1899 row = rows[0]
1900 if not Transaction.__check_json_type(row, (dict,),
1901 '"select" reply row'):
1902 return False
1903 column = row.get(self._inc_column)
1904 if not Transaction.__check_json_type(column, six.integer_types,
1905 '"select" reply inc column'):
1906 return False
1907 self._inc_new_value = column
1908 return True
1909
1910 def __process_insert_reply(self, insert, ops):
1911 if insert.op_index >= len(ops):
1912 # XXX rate-limit
1913 vlog.warn("reply does not contain enough operations "
1914 "for insert (has %d, needs %d)"
1915 % (len(ops), insert.op_index))
1916 return False
1917
1918 # We know that this is a JSON object because the loop in
1919 # __process_reply() already checked.
1920 reply = ops[insert.op_index]
1921 json_uuid = reply.get("uuid")
1922 if not Transaction.__check_json_type(json_uuid, (tuple, list),
1923 '"insert" reply "uuid"'):
1924 return False
1925
1926 try:
1927 uuid_ = ovs.ovsuuid.from_json(json_uuid)
1928 except error.Error:
1929 # XXX rate-limit
1930 vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1931 return False
1932
1933 insert.real = uuid_
1934 return True
1935
1936
1937 class SchemaHelper(object):
1938 """IDL Schema helper.
1939
1940 This class encapsulates the logic required to generate schemas suitable
1941 for creating 'ovs.db.idl.Idl' objects. Clients should register columns
1942 they are interested in using register_columns(). When finished, the
1943 get_idl_schema() function may be called.
1944
1945 The location on disk of the schema used may be found in the
1946 'schema_location' variable."""
1947
1948 def __init__(self, location=None, schema_json=None):
1949 """Creates a new Schema object.
1950
1951 'location' file path to ovs schema. None means default location
1952 'schema_json' schema in json preresentation in memory
1953 """
1954
1955 if location and schema_json:
1956 raise ValueError("both location and schema_json can't be "
1957 "specified. it's ambiguous.")
1958 if schema_json is None:
1959 if location is None:
1960 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1961 schema_json = ovs.json.from_file(location)
1962
1963 self.schema_json = schema_json
1964 self._tables = {}
1965 self._readonly = {}
1966 self._all = False
1967
1968 def register_columns(self, table, columns, readonly=[]):
1969 """Registers interest in the given 'columns' of 'table'. Future calls
1970 to get_idl_schema() will include 'table':column for each column in
1971 'columns'. This function automatically avoids adding duplicate entries
1972 to the schema.
1973 A subset of 'columns' can be specified as 'readonly'. The readonly
1974 columns are not replicated but can be fetched on-demand by the user
1975 with Row.fetch().
1976
1977 'table' must be a string.
1978 'columns' must be a list of strings.
1979 'readonly' must be a list of strings.
1980 """
1981
1982 assert isinstance(table, six.string_types)
1983 assert isinstance(columns, list)
1984
1985 columns = set(columns) | self._tables.get(table, set())
1986 self._tables[table] = columns
1987 self._readonly[table] = readonly
1988
1989 def register_table(self, table):
1990 """Registers interest in the given all columns of 'table'. Future calls
1991 to get_idl_schema() will include all columns of 'table'.
1992
1993 'table' must be a string
1994 """
1995 assert isinstance(table, six.string_types)
1996 self._tables[table] = set() # empty set means all columns in the table
1997
1998 def register_all(self):
1999 """Registers interest in every column of every table."""
2000 self._all = True
2001
2002 def get_idl_schema(self):
2003 """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
2004 object based on columns registered using the register_columns()
2005 function."""
2006
2007 schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
2008 self.schema_json = None
2009
2010 if not self._all:
2011 schema_tables = {}
2012 for table, columns in six.iteritems(self._tables):
2013 schema_tables[table] = (
2014 self._keep_table_columns(schema, table, columns))
2015
2016 schema.tables = schema_tables
2017 schema.readonly = self._readonly
2018 return schema
2019
2020 def _keep_table_columns(self, schema, table_name, columns):
2021 assert table_name in schema.tables
2022 table = schema.tables[table_name]
2023
2024 if not columns:
2025 # empty set means all columns in the table
2026 return table
2027
2028 new_columns = {}
2029 for column_name in columns:
2030 assert isinstance(column_name, six.string_types)
2031 assert column_name in table.columns
2032
2033 new_columns[column_name] = table.columns[column_name]
2034
2035 table.columns = new_columns
2036 return table