1 # Copyright (c) 2009, 2010, 2011, 2012, 2013, 2016 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 import ovs
.db
.data
as data
25 from ovs
.db
import custom_index
26 from ovs
.db
import error
30 vlog
= ovs
.vlog
.Vlog("idl")
32 __pychecker__
= 'no-classattr no-objattrs'
41 CLUSTERED
= "clustered"
45 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
47 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
48 requests to an OVSDB database server and parses the responses, converting
49 raw JSON into data structures that are easier for clients to digest.
51 The IDL also assists with issuing database transactions. The client
52 creates a transaction, manipulates the IDL data structures, and commits or
53 aborts the transaction. The IDL then composes and issues the necessary
54 JSON-RPC requests and reports to the client whether the transaction
55 completed successfully.
57 The client is allowed to access the following attributes directly, in a
60 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
61 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
62 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
65 The client may directly read and write the Row objects referenced by the
66 'rows' map values. Refer to Row for more details.
68 - 'change_seqno': A number that represents the IDL's state. When the IDL
69 is updated (by Idl.run()), its value changes. The sequence number can
70 occasionally change even if the database does not. This happens if the
71 connection to the database drops and reconnects, which causes the
72 database contents to be reloaded even if they didn't change. (It could
73 also happen if the database server sends out a "change" that reflects
74 what the IDL already thought was in the database. The database server is
75 not supposed to do that, but bugs could in theory cause it to do so.)
77 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
78 if no lock is configured.
80 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
81 lock, and False otherwise.
83 Locking and unlocking happens asynchronously from the database client's
84 point of view, so the information is only useful for optimization
85 (e.g. if the client doesn't have the lock then there's no point in trying
86 to write to the database).
88 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
89 the database server has indicated that some other client already owns the
90 requested lock, and False otherwise.
92 - 'txn': The ovs.db.idl.Transaction object for the database transaction
93 currently being constructed, if there is one, or None otherwise.
97 IDL_S_SERVER_SCHEMA_REQUESTED
= 1
98 IDL_S_SERVER_MONITOR_REQUESTED
= 2
99 IDL_S_DATA_MONITOR_REQUESTED
= 3
100 IDL_S_DATA_MONITOR_COND_REQUESTED
= 4
102 def __init__(self
, remote
, schema_helper
, probe_interval
=None,
104 """Creates and returns a connection to the database named 'db_name' on
105 'remote', which should be in a form acceptable to
106 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
107 replica of the remote database.
109 'remote' can be comma separated multiple remotes and each remote
110 should be in a form acceptable to ovs.jsonrpc.session.open().
112 'schema_helper' should be an instance of the SchemaHelper class which
113 generates schema for the remote database. The caller may have cut it
114 down by removing tables or columns that are not of interest. The IDL
115 will only replicate the tables and columns that remain. The caller may
116 also add an attribute named 'alert' to selected remaining columns,
117 setting its value to False; if so, then changes to those columns will
118 not be considered changes to the database for the purpose of the return
119 value of Idl.run() and Idl.change_seqno. This is useful for columns
120 that the IDL's client will write but not read.
122 As a convenience to users, 'schema' may also be an instance of the
125 The IDL uses and modifies 'schema' directly.
127 If 'leader_only' is set to True (default value) the IDL will only
128 monitor and transact with the leader of the cluster.
130 If "probe_interval" is zero it disables the connection keepalive
131 feature. If non-zero the value will be forced to at least 1000
132 milliseconds. If None it will just use the default value in OVS.
135 assert isinstance(schema_helper
, SchemaHelper
)
136 schema
= schema_helper
.get_idl_schema()
138 self
.tables
= schema
.tables
139 self
.readonly
= schema
.readonly
141 remotes
= self
._parse
_remotes
(remote
)
142 self
._session
= ovs
.jsonrpc
.Session
.open_multiple(remotes
,
143 probe_interval
=probe_interval
)
144 self
._monitor
_request
_id
= None
145 self
._last
_seqno
= None
146 self
.change_seqno
= 0
147 self
.uuid
= uuid
.uuid1()
150 self
._server
_schema
_request
_id
= None
151 self
._server
_monitor
_request
_id
= None
152 self
._db
_change
_aware
_request
_id
= None
153 self
._server
_db
_name
= '_Server'
154 self
._server
_db
_table
= 'Database'
155 self
.server_tables
= None
156 self
._server
_db
= None
157 self
.server_monitor_uuid
= uuid
.uuid1()
158 self
.leader_only
= leader_only
159 self
.cluster_id
= None
162 self
.state
= self
.IDL_S_INITIAL
165 self
.lock_name
= None # Name of lock we need, None if none.
166 self
.has_lock
= False # Has db server said we have the lock?
167 self
.is_lock_contended
= False # Has db server said we can't get lock?
168 self
._lock
_request
_id
= None # JSON-RPC ID of in-flight lock request.
170 # Transaction support.
172 self
._outstanding
_txns
= {}
174 for table
in six
.itervalues(schema
.tables
):
175 for column
in six
.itervalues(table
.columns
):
176 if not hasattr(column
, 'alert'):
178 table
.need_table
= False
179 table
.rows
= custom_index
.IndexedRows(table
)
181 table
.condition
= [True]
182 table
.cond_changed
= False
184 def _parse_remotes(self
, remote
):
186 # "tcp:10.0.0.1:6641,unix:/tmp/db.sock,t,s,tcp:10.0.0.2:6642"
187 # this function returns
188 # ["tcp:10.0.0.1:6641", "unix:/tmp/db.sock,t,s", tcp:10.0.0.2:6642"]
190 for r
in remote
.split(','):
191 if remotes
and r
.find(":") == -1:
192 remotes
[-1] += "," + r
197 def set_cluster_id(self
, cluster_id
):
198 """Set the id of the cluster that this idl must connect to."""
199 self
.cluster_id
= cluster_id
200 if self
.state
!= self
.IDL_S_INITIAL
:
201 self
.force_reconnect()
203 def index_create(self
, table
, name
):
204 """Create a named multi-column index on a table"""
205 return self
.tables
[table
].rows
.index_create(name
)
207 def index_irange(self
, table
, name
, start
, end
):
208 """Return items in a named index between start/end inclusive"""
209 return self
.tables
[table
].rows
.indexes
[name
].irange(start
, end
)
211 def index_equal(self
, table
, name
, value
):
212 """Return items in a named index matching a value"""
213 return self
.tables
[table
].rows
.indexes
[name
].irange(value
, value
)
216 """Closes the connection to the database. The IDL will no longer
218 self
._session
.close()
221 """Processes a batch of messages from the database server. Returns
222 True if the database as seen through the IDL changed, False if it did
223 not change. The initial fetch of the entire contents of the remote
224 database is considered to be one kind of change. If the IDL has been
225 configured to acquire a database lock (with Idl.set_lock()), then
226 successfully acquiring the lock is also considered to be a change.
228 This function can return occasional false positives, that is, report
229 that the database changed even though it didn't. This happens if the
230 connection to the database drops and reconnects, which causes the
231 database contents to be reloaded even if they didn't change. (It could
232 also happen if the database server sends out a "change" that reflects
233 what we already thought was in the database, but the database server is
234 not supposed to do that.)
236 As an alternative to checking the return value, the client may check
237 for changes in self.change_seqno."""
239 initial_change_seqno
= self
.change_seqno
241 self
.send_cond_change()
246 if not self
._session
.is_connected():
249 seqno
= self
._session
.get_seqno()
250 if seqno
!= self
._last
_seqno
:
251 self
._last
_seqno
= seqno
252 self
.__txn
_abort
_all
()
253 self
.__send
_server
_schema
_request
()
255 self
.__send
_lock
_request
()
258 msg
= self
._session
.recv()
262 if (msg
.type == ovs
.jsonrpc
.Message
.T_NOTIFY
263 and msg
.method
== "update2"
264 and len(msg
.params
) == 2):
265 # Database contents changed.
266 self
.__parse
_update
(msg
.params
[1], OVSDB_UPDATE2
)
267 elif (msg
.type == ovs
.jsonrpc
.Message
.T_NOTIFY
268 and msg
.method
== "update"
269 and len(msg
.params
) == 2):
270 # Database contents changed.
271 if msg
.params
[0] == str(self
.server_monitor_uuid
):
272 self
.__parse
_update
(msg
.params
[1], OVSDB_UPDATE
,
273 tables
=self
.server_tables
)
274 self
.change_seqno
= initial_change_seqno
275 if not self
.__check
_server
_db
():
276 self
.force_reconnect()
279 self
.__parse
_update
(msg
.params
[1], OVSDB_UPDATE
)
280 elif (msg
.type == ovs
.jsonrpc
.Message
.T_REPLY
281 and self
._monitor
_request
_id
is not None
282 and self
._monitor
_request
_id
== msg
.id):
283 # Reply to our "monitor" request.
285 self
.change_seqno
+= 1
286 self
._monitor
_request
_id
= None
288 if self
.state
== self
.IDL_S_DATA_MONITOR_COND_REQUESTED
:
289 self
.__parse
_update
(msg
.result
, OVSDB_UPDATE2
)
291 assert self
.state
== self
.IDL_S_DATA_MONITOR_REQUESTED
292 self
.__parse
_update
(msg
.result
, OVSDB_UPDATE
)
294 except error
.Error
as e
:
295 vlog
.err("%s: parse error in received schema: %s"
296 % (self
._session
.get_name(), e
))
298 elif (msg
.type == ovs
.jsonrpc
.Message
.T_REPLY
299 and self
._server
_schema
_request
_id
is not None
300 and self
._server
_schema
_request
_id
== msg
.id):
301 # Reply to our "get_schema" of _Server request.
303 self
._server
_schema
_request
_id
= None
304 sh
= SchemaHelper(None, msg
.result
)
305 sh
.register_table(self
._server
_db
_table
)
306 schema
= sh
.get_idl_schema()
307 self
._server
_db
= schema
308 self
.server_tables
= schema
.tables
309 self
.__send
_server
_monitor
_request
()
310 except error
.Error
as e
:
311 vlog
.err("%s: error receiving server schema: %s"
312 % (self
._session
.get_name(), e
))
317 self
.change_seqno
= initial_change_seqno
318 self
.__send
_monitor
_request
()
319 elif (msg
.type == ovs
.jsonrpc
.Message
.T_REPLY
320 and self
._server
_monitor
_request
_id
is not None
321 and self
._server
_monitor
_request
_id
== msg
.id):
322 # Reply to our "monitor" of _Server request.
324 self
._server
_monitor
_request
_id
= None
325 self
.__parse
_update
(msg
.result
, OVSDB_UPDATE
,
326 tables
=self
.server_tables
)
327 self
.change_seqno
= initial_change_seqno
328 if self
.__check
_server
_db
():
329 self
.__send
_monitor
_request
()
330 self
.__send
_db
_change
_aware
()
332 self
.force_reconnect()
334 except error
.Error
as e
:
335 vlog
.err("%s: parse error in received schema: %s"
336 % (self
._session
.get_name(), e
))
341 self
.change_seqno
= initial_change_seqno
342 self
.__send
_monitor
_request
()
343 elif (msg
.type == ovs
.jsonrpc
.Message
.T_REPLY
344 and self
._db
_change
_aware
_request
_id
is not None
345 and self
._db
_change
_aware
_request
_id
== msg
.id):
346 # Reply to us notifying the server of our change awarness.
347 self
._db
_change
_aware
_request
_id
= None
348 elif (msg
.type == ovs
.jsonrpc
.Message
.T_REPLY
349 and self
._lock
_request
_id
is not None
350 and self
._lock
_request
_id
== msg
.id):
351 # Reply to our "lock" request.
352 self
.__parse
_lock
_reply
(msg
.result
)
353 elif (msg
.type == ovs
.jsonrpc
.Message
.T_NOTIFY
354 and msg
.method
== "locked"):
356 self
.__parse
_lock
_notify
(msg
.params
, True)
357 elif (msg
.type == ovs
.jsonrpc
.Message
.T_NOTIFY
358 and msg
.method
== "stolen"):
359 # Someone else stole our lock.
360 self
.__parse
_lock
_notify
(msg
.params
, False)
361 elif msg
.type == ovs
.jsonrpc
.Message
.T_NOTIFY
and msg
.id == "echo":
362 # Reply to our echo request. Ignore it.
364 elif (msg
.type == ovs
.jsonrpc
.Message
.T_ERROR
and
365 self
.state
== self
.IDL_S_DATA_MONITOR_COND_REQUESTED
and
366 self
._monitor
_request
_id
== msg
.id):
367 if msg
.error
== "unknown method":
368 self
.__send
_monitor
_request
()
369 elif (msg
.type == ovs
.jsonrpc
.Message
.T_ERROR
and
370 self
._server
_schema
_request
_id
is not None and
371 self
._server
_schema
_request
_id
== msg
.id):
372 self
._server
_schema
_request
_id
= None
374 self
.force_reconnect()
377 self
.change_seqno
= initial_change_seqno
378 self
.__send
_monitor
_request
()
379 elif (msg
.type in (ovs
.jsonrpc
.Message
.T_ERROR
,
380 ovs
.jsonrpc
.Message
.T_REPLY
)
381 and self
.__txn
_process
_reply
(msg
)):
382 # __txn_process_reply() did everything needed.
385 # This can happen if a transaction is destroyed before we
386 # receive the reply, so keep the log level low.
387 vlog
.dbg("%s: received unexpected %s message"
388 % (self
._session
.get_name(),
389 ovs
.jsonrpc
.Message
.type_to_string(msg
.type)))
391 return initial_change_seqno
!= self
.change_seqno
393 def send_cond_change(self
):
394 if not self
._session
.is_connected():
397 for table
in six
.itervalues(self
.tables
):
398 if table
.cond_changed
:
399 self
.__send
_cond
_change
(table
, table
.condition
)
400 table
.cond_changed
= False
402 def cond_change(self
, table_name
, cond
):
403 """Sets the condition for 'table_name' to 'cond', which should be a
404 conditional expression suitable for use directly in the OVSDB
405 protocol, with the exception that the empty condition []
406 matches no rows (instead of matching every row). That is, []
407 is equivalent to [False], not to [True].
410 table
= self
.tables
.get(table_name
)
412 raise error
.Error('Unknown table "%s"' % table_name
)
416 if table
.condition
!= cond
:
417 table
.condition
= cond
418 table
.cond_changed
= True
420 def wait(self
, poller
):
421 """Arranges for poller.block() to wake up when self.run() has something
422 to do or when activity occurs on a transaction on 'self'."""
423 self
._session
.wait(poller
)
424 self
._session
.recv_wait(poller
)
426 def has_ever_connected(self
):
427 """Returns True, if the IDL successfully connected to the remote
428 database and retrieved its contents (even if the connection
429 subsequently dropped and is in the process of reconnecting). If so,
430 then the IDL contains an atomic snapshot of the database's contents
431 (but it might be arbitrarily old if the connection dropped).
433 Returns False if the IDL has never connected or retrieved the
434 database's contents. If so, the IDL is empty."""
435 return self
.change_seqno
!= 0
437 def force_reconnect(self
):
438 """Forces the IDL to drop its connection to the database and reconnect.
439 In the meantime, the contents of the IDL will not change."""
440 self
._session
.force_reconnect()
442 def session_name(self
):
443 return self
._session
.get_name()
445 def set_lock(self
, lock_name
):
446 """If 'lock_name' is not None, configures the IDL to obtain the named
447 lock from the database server and to avoid modifying the database when
448 the lock cannot be acquired (that is, when another client has the same
451 If 'lock_name' is None, drops the locking requirement and releases the
454 assert not self
._outstanding
_txns
456 if self
.lock_name
and (not lock_name
or lock_name
!= self
.lock_name
):
457 # Release previous lock.
458 self
.__send
_unlock
_request
()
459 self
.lock_name
= None
460 self
.is_lock_contended
= False
462 if lock_name
and not self
.lock_name
:
464 self
.lock_name
= lock_name
465 self
.__send
_lock
_request
()
467 def notify(self
, event
, row
, updates
=None):
468 """Hook for implementing create/update/delete notifications
470 :param event: The event that was triggered
471 :type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE
472 :param row: The row as it is after the operation has occured
474 :param updates: For updates, row with only old values of the changed
479 def __send_cond_change(self
, table
, cond
):
480 monitor_cond_change
= {table
.name
: [{"where": cond
}]}
481 old_uuid
= str(self
.uuid
)
482 self
.uuid
= uuid
.uuid1()
483 params
= [old_uuid
, str(self
.uuid
), monitor_cond_change
]
484 msg
= ovs
.jsonrpc
.Message
.create_request("monitor_cond_change", params
)
485 self
._session
.send(msg
)
490 for table
in six
.itervalues(self
.tables
):
493 table
.rows
= custom_index
.IndexedRows(table
)
496 self
.change_seqno
+= 1
498 def __update_has_lock(self
, new_has_lock
):
499 if new_has_lock
and not self
.has_lock
:
500 if self
._monitor
_request
_id
is None:
501 self
.change_seqno
+= 1
503 # We're waiting for a monitor reply, so don't signal that the
504 # database changed. The monitor reply will increment
505 # change_seqno anyhow.
507 self
.is_lock_contended
= False
508 self
.has_lock
= new_has_lock
510 def __do_send_lock_request(self
, method
):
511 self
.__update
_has
_lock
(False)
512 self
._lock
_request
_id
= None
513 if self
._session
.is_connected():
514 msg
= ovs
.jsonrpc
.Message
.create_request(method
, [self
.lock_name
])
516 self
._session
.send(msg
)
521 def __send_lock_request(self
):
522 self
._lock
_request
_id
= self
.__do
_send
_lock
_request
("lock")
524 def __send_unlock_request(self
):
525 self
.__do
_send
_lock
_request
("unlock")
527 def __parse_lock_reply(self
, result
):
528 self
._lock
_request
_id
= None
529 got_lock
= isinstance(result
, dict) and result
.get("locked") is True
530 self
.__update
_has
_lock
(got_lock
)
532 self
.is_lock_contended
= True
534 def __parse_lock_notify(self
, params
, new_has_lock
):
535 if (self
.lock_name
is not None
536 and isinstance(params
, (list, tuple))
538 and params
[0] == self
.lock_name
):
539 self
.__update
_has
_lock
(new_has_lock
)
541 self
.is_lock_contended
= True
543 def __send_db_change_aware(self
):
544 msg
= ovs
.jsonrpc
.Message
.create_request("set_db_change_aware",
546 self
._db
_change
_aware
_request
_id
= msg
.id
547 self
._session
.send(msg
)
549 def __send_monitor_request(self
):
550 if (self
.state
in [self
.IDL_S_SERVER_MONITOR_REQUESTED
,
551 self
.IDL_S_INITIAL
]):
552 self
.state
= self
.IDL_S_DATA_MONITOR_COND_REQUESTED
553 method
= "monitor_cond"
555 self
.state
= self
.IDL_S_DATA_MONITOR_REQUESTED
558 monitor_requests
= {}
559 for table
in six
.itervalues(self
.tables
):
561 for column
in six
.iterkeys(table
.columns
):
562 if ((table
.name
not in self
.readonly
) or
563 (table
.name
in self
.readonly
) and
564 (column
not in self
.readonly
[table
.name
])):
565 columns
.append(column
)
566 monitor_request
= {"columns": columns
}
567 if method
== "monitor_cond" and table
.condition
!= [True]:
568 monitor_request
["where"] = table
.condition
569 table
.cond_change
= False
570 monitor_requests
[table
.name
] = [monitor_request
]
572 msg
= ovs
.jsonrpc
.Message
.create_request(
573 method
, [self
._db
.name
, str(self
.uuid
), monitor_requests
])
574 self
._monitor
_request
_id
= msg
.id
575 self
._session
.send(msg
)
577 def __send_server_schema_request(self
):
578 self
.state
= self
.IDL_S_SERVER_SCHEMA_REQUESTED
579 msg
= ovs
.jsonrpc
.Message
.create_request(
580 "get_schema", [self
._server
_db
_name
, str(self
.uuid
)])
581 self
._server
_schema
_request
_id
= msg
.id
582 self
._session
.send(msg
)
584 def __send_server_monitor_request(self
):
585 self
.state
= self
.IDL_S_SERVER_MONITOR_REQUESTED
586 monitor_requests
= {}
587 table
= self
.server_tables
[self
._server
_db
_table
]
588 columns
= [column
for column
in six
.iterkeys(table
.columns
)]
589 for column
in six
.itervalues(table
.columns
):
590 if not hasattr(column
, 'alert'):
592 table
.rows
= custom_index
.IndexedRows(table
)
593 table
.need_table
= False
595 monitor_request
= {"columns": columns
}
596 monitor_requests
[table
.name
] = [monitor_request
]
597 msg
= ovs
.jsonrpc
.Message
.create_request(
598 'monitor', [self
._server
_db
.name
,
599 str(self
.server_monitor_uuid
),
601 self
._server
_monitor
_request
_id
= msg
.id
602 self
._session
.send(msg
)
604 def __parse_update(self
, update
, version
, tables
=None):
607 self
.__do
_parse
_update
(update
, version
, self
.tables
)
609 self
.__do
_parse
_update
(update
, version
, tables
)
610 except error
.Error
as e
:
611 vlog
.err("%s: error parsing update: %s"
612 % (self
._session
.get_name(), e
))
614 def __do_parse_update(self
, table_updates
, version
, tables
):
615 if not isinstance(table_updates
, dict):
616 raise error
.Error("<table-updates> is not an object",
619 for table_name
, table_update
in six
.iteritems(table_updates
):
620 table
= tables
.get(table_name
)
622 raise error
.Error('<table-updates> includes unknown '
623 'table "%s"' % table_name
)
625 if not isinstance(table_update
, dict):
626 raise error
.Error('<table-update> for table "%s" is not '
627 'an object' % table_name
, table_update
)
629 for uuid_string
, row_update
in six
.iteritems(table_update
):
630 if not ovs
.ovsuuid
.is_valid_string(uuid_string
):
631 raise error
.Error('<table-update> for table "%s" '
632 'contains bad UUID "%s" as member '
633 'name' % (table_name
, uuid_string
),
635 uuid
= ovs
.ovsuuid
.from_string(uuid_string
)
637 if not isinstance(row_update
, dict):
638 raise error
.Error('<table-update> for table "%s" '
639 'contains <row-update> for %s that '
641 % (table_name
, uuid_string
))
643 if version
== OVSDB_UPDATE2
:
644 if self
.__process
_update
2(table
, uuid
, row_update
):
645 self
.change_seqno
+= 1
648 parser
= ovs
.db
.parser
.Parser(row_update
, "row-update")
649 old
= parser
.get_optional("old", [dict])
650 new
= parser
.get_optional("new", [dict])
653 if not old
and not new
:
654 raise error
.Error('<row-update> missing "old" and '
655 '"new" members', row_update
)
657 if self
.__process
_update
(table
, uuid
, old
, new
):
658 self
.change_seqno
+= 1
660 def __process_update2(self
, table
, uuid
, row_update
):
661 row
= table
.rows
.get(uuid
)
663 if "delete" in row_update
:
666 self
.notify(ROW_DELETE
, row
)
670 vlog
.warn("cannot delete missing row %s from table"
671 "%s" % (uuid
, table
.name
))
672 elif "insert" in row_update
or "initial" in row_update
:
674 vlog
.warn("cannot add existing row %s from table"
675 " %s" % (uuid
, table
.name
))
677 row
= self
.__create
_row
(table
, uuid
)
678 if "insert" in row_update
:
679 row_update
= row_update
['insert']
681 row_update
= row_update
['initial']
682 self
.__add
_default
(table
, row_update
)
683 changed
= self
.__row
_update
(table
, row
, row_update
)
684 table
.rows
[uuid
] = row
686 self
.notify(ROW_CREATE
, row
)
687 elif "modify" in row_update
:
689 raise error
.Error('Modify non-existing row')
691 old_row
= self
.__apply
_diff
(table
, row
, row_update
['modify'])
692 self
.notify(ROW_UPDATE
, row
, Row(self
, table
, uuid
, old_row
))
695 raise error
.Error('<row-update> unknown operation',
699 def __process_update(self
, table
, uuid
, old
, new
):
700 """Returns True if a column changed, False otherwise."""
701 row
= table
.rows
.get(uuid
)
708 self
.notify(ROW_DELETE
, row
)
711 vlog
.warn("cannot delete missing row %s from table %s"
712 % (uuid
, table
.name
))
717 row
= self
.__create
_row
(table
, uuid
)
722 vlog
.warn("cannot add existing row %s to table %s"
723 % (uuid
, table
.name
))
724 changed |
= self
.__row
_update
(table
, row
, new
)
726 table
.rows
[uuid
] = row
728 self
.notify(ROW_CREATE
, row
)
732 row
= self
.__create
_row
(table
, uuid
)
736 vlog
.warn("cannot modify missing row %s in table %s"
737 % (uuid
, table
.name
))
738 changed |
= self
.__row
_update
(table
, row
, new
)
740 table
.rows
[uuid
] = row
742 self
.notify(op
, row
, Row
.from_json(self
, table
, uuid
, old
))
745 def __check_server_db(self
):
746 """Returns True if this is a valid server database, False otherwise."""
747 session_name
= self
.session_name()
749 if self
._server
_db
_table
not in self
.server_tables
:
750 vlog
.info("%s: server does not have %s table in its %s database"
751 % (session_name
, self
._server
_db
_table
,
752 self
._server
_db
_name
))
755 rows
= self
.server_tables
[self
._server
_db
_table
].rows
758 for row
in six
.itervalues(rows
):
760 if self
.cluster_id
in \
761 map(lambda x
: str(x
)[:4], row
.cid
):
764 elif row
.name
== self
._db
.name
:
769 vlog
.info("%s: server does not have %s database"
770 % (session_name
, self
._db
.name
))
773 if (database
.model
== CLUSTERED
and
774 self
._session
.get_num_of_remotes() > 1):
775 if not database
.schema
:
776 vlog
.info('%s: clustered database server has not yet joined '
777 'cluster; trying another server' % session_name
)
779 if not database
.connected
:
780 vlog
.info('%s: clustered database server is disconnected '
781 'from cluster; trying another server' % session_name
)
783 if (self
.leader_only
and
784 not database
.leader
):
785 vlog
.info('%s: clustered database server is not cluster '
786 'leader; trying another server' % session_name
)
789 if database
.index
[0] < self
._min
_index
:
790 vlog
.warn('%s: clustered database server has stale data; '
791 'trying another server' % session_name
)
793 self
._min
_index
= database
.index
[0]
797 def __column_name(self
, column
):
798 if column
.type.key
.type == ovs
.db
.types
.UuidType
:
799 return ovs
.ovsuuid
.to_json(column
.type.key
.type.default
)
801 return column
.type.key
.type.default
803 def __add_default(self
, table
, row_update
):
804 for column
in six
.itervalues(table
.columns
):
805 if column
.name
not in row_update
:
806 if ((table
.name
not in self
.readonly
) or
807 (table
.name
in self
.readonly
) and
808 (column
.name
not in self
.readonly
[table
.name
])):
809 if column
.type.n_min
!= 0 and not column
.type.is_map():
810 row_update
[column
.name
] = self
.__column
_name
(column
)
812 def __apply_diff(self
, table
, row
, row_diff
):
814 for column_name
, datum_diff_json
in six
.iteritems(row_diff
):
815 column
= table
.columns
.get(column_name
)
818 vlog
.warn("unknown column %s updating table %s"
819 % (column_name
, table
.name
))
823 datum_diff
= data
.Datum
.from_json(column
.type, datum_diff_json
)
824 except error
.Error
as e
:
826 vlog
.warn("error parsing column %s in table %s: %s"
827 % (column_name
, table
.name
, e
))
830 old_row
[column_name
] = row
._data
[column_name
].copy()
831 datum
= row
._data
[column_name
].diff(datum_diff
)
832 if datum
!= row
._data
[column_name
]:
833 row
._data
[column_name
] = datum
837 def __row_update(self
, table
, row
, row_json
):
839 for column_name
, datum_json
in six
.iteritems(row_json
):
840 column
= table
.columns
.get(column_name
)
843 vlog
.warn("unknown column %s updating table %s"
844 % (column_name
, table
.name
))
848 datum
= data
.Datum
.from_json(column
.type, datum_json
)
849 except error
.Error
as e
:
851 vlog
.warn("error parsing column %s in table %s: %s"
852 % (column_name
, table
.name
, e
))
855 if datum
!= row
._data
[column_name
]:
856 row
._data
[column_name
] = datum
860 # Didn't really change but the OVSDB monitor protocol always
861 # includes every value in a row.
865 def __create_row(self
, table
, uuid
):
867 for column
in six
.itervalues(table
.columns
):
868 data
[column
.name
] = ovs
.db
.data
.Datum
.default(column
.type)
869 return Row(self
, table
, uuid
, data
)
872 self
._session
.force_reconnect()
874 def __txn_abort_all(self
):
875 while self
._outstanding
_txns
:
876 txn
= self
._outstanding
_txns
.popitem()[1]
877 txn
._status
= Transaction
.TRY_AGAIN
879 def __txn_process_reply(self
, msg
):
880 txn
= self
._outstanding
_txns
.pop(msg
.id, None)
882 txn
._process
_reply
(msg
)
886 def _uuid_to_row(atom
, base
):
888 return base
.ref_table
.rows
.get(atom
)
893 def _row_to_uuid(value
):
894 if isinstance(value
, Row
):
900 @functools.total_ordering
902 """A row within an IDL.
904 The client may access the following attributes directly:
906 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
908 - An attribute for each column in the Row's table, named for the column,
909 whose values are as returned by Datum.to_python() for the column's type.
911 If some error occurs (e.g. the database server's idea of the column is
912 different from the IDL's idea), then the attribute values is the
913 "default" value return by Datum.default() for the column's type. (It is
914 important to know this because the default value may violate constraints
915 for the column's type, e.g. the default integer value is 0 even if column
916 contraints require the column's value to be positive.)
918 When a transaction is active, column attributes may also be assigned new
919 values. Committing the transaction will then cause the new value to be
920 stored into the database.
922 *NOTE*: In the current implementation, the value of a column is a *copy*
923 of the value in the database. This means that modifying its value
924 directly will have no useful effect. For example, the following:
925 row.mycolumn["a"] = "b" # don't do this
926 will not change anything in the database, even after commit. To modify
927 the column, instead assign the modified column value back to the column:
932 def __init__(self
, idl
, table
, uuid
, data
):
933 # All of the explicit references to self.__dict__ below are required
934 # to set real attributes with invoking self.__getattr__().
935 self
.__dict
__["uuid"] = uuid
937 self
.__dict
__["_idl"] = idl
938 self
.__dict
__["_table"] = table
940 # _data is the committed data. It takes the following values:
942 # - A dictionary that maps every column name to a Datum, if the row
943 # exists in the committed form of the database.
945 # - None, if this row is newly inserted within the active transaction
946 # and thus has no committed form.
947 self
.__dict
__["_data"] = data
949 # _changes describes changes to this row within the active transaction.
950 # It takes the following values:
952 # - {}, the empty dictionary, if no transaction is active or if the
953 # row has yet not been changed within this transaction.
955 # - A dictionary that maps a column name to its new Datum, if an
956 # active transaction changes those columns' values.
958 # - A dictionary that maps every column name to a Datum, if the row
959 # is newly inserted within the active transaction.
961 # - None, if this transaction deletes this row.
962 self
.__dict
__["_changes"] = {}
964 # _mutations describes changes to this row to be handled via a
965 # mutate operation on the wire. It takes the following values:
967 # - {}, the empty dictionary, if no transaction is active or if the
968 # row has yet not been mutated within this transaction.
970 # - A dictionary that contains two keys:
972 # - "_inserts" contains a dictionary that maps column names to
973 # new keys/key-value pairs that should be inserted into the
975 # - "_removes" contains a dictionary that maps column names to
976 # the keys/key-value pairs that should be removed from the
979 # - None, if this transaction deletes this row.
980 self
.__dict
__["_mutations"] = {}
982 # A dictionary whose keys are the names of columns that must be
983 # verified as prerequisites when the transaction commits. The values
984 # in the dictionary are all None.
985 self
.__dict
__["_prereqs"] = {}
987 def __lt__(self
, other
):
988 if not isinstance(other
, Row
):
989 return NotImplemented
990 return bool(self
.__dict
__['uuid'] < other
.__dict
__['uuid'])
992 def __eq__(self
, other
):
993 if not isinstance(other
, Row
):
994 return NotImplemented
995 return bool(self
.__dict
__['uuid'] == other
.__dict
__['uuid'])
998 return int(self
.__dict
__['uuid'])
1001 return "{table}({data})".format(
1002 table
=self
._table
.name
,
1003 data
=", ".join("{col}={val}".format(col
=c
, val
=getattr(self
, c
))
1004 for c
in sorted(self
._table
.columns
)))
1006 def __getattr__(self
, column_name
):
1007 assert self
._changes
is not None
1008 assert self
._mutations
is not None
1011 column
= self
._table
.columns
[column_name
]
1013 raise AttributeError("%s instance has no attribute '%s'" %
1014 (self
.__class
__.__name
__, column_name
))
1015 datum
= self
._changes
.get(column_name
)
1017 if '_inserts' in self
._mutations
.keys():
1018 inserts
= self
._mutations
['_inserts'].get(column_name
)
1020 if '_removes' in self
._mutations
.keys():
1021 removes
= self
._mutations
['_removes'].get(column_name
)
1023 if self
._data
is None:
1025 raise AttributeError("%s instance has no attribute '%s'" %
1026 (self
.__class
__.__name
__,
1029 datum
= data
.Datum
.from_python(column
.type,
1032 elif column_name
in self
._data
:
1033 datum
= self
._data
[column_name
]
1034 if column
.type.is_set():
1035 dlist
= datum
.as_list()
1036 if inserts
is not None:
1037 dlist
.extend(list(inserts
))
1038 if removes
is not None:
1039 removes_datum
= data
.Datum
.from_python(column
.type,
1042 removes_list
= removes_datum
.as_list()
1043 dlist
= [x
for x
in dlist
if x
not in removes_list
]
1044 datum
= data
.Datum
.from_python(column
.type, dlist
,
1046 elif column
.type.is_map():
1047 dmap
= datum
.to_python(_uuid_to_row
)
1048 if inserts
is not None:
1049 dmap
.update(inserts
)
1050 if removes
is not None:
1052 if key
not in (inserts
or {}):
1054 datum
= data
.Datum
.from_python(column
.type, dmap
,
1058 raise AttributeError("%s instance has no attribute '%s'" %
1059 (self
.__class
__.__name
__,
1064 return datum
.to_python(_uuid_to_row
)
1066 def __setattr__(self
, column_name
, value
):
1067 assert self
._changes
is not None
1068 assert self
._idl
.txn
1070 if ((self
._table
.name
in self
._idl
.readonly
) and
1071 (column_name
in self
._idl
.readonly
[self
._table
.name
])):
1072 vlog
.warn("attempting to write to readonly column %s"
1076 column
= self
._table
.columns
[column_name
]
1078 datum
= data
.Datum
.from_python(column
.type, value
, _row_to_uuid
)
1079 except error
.Error
as e
:
1081 vlog
.err("attempting to write bad value to column %s (%s)"
1084 # Remove prior version of the Row from the index if it has the indexed
1085 # column set, and the column changing is an indexed column
1086 if hasattr(self
, column_name
):
1087 for idx
in self
._table
.rows
.indexes
.values():
1088 if column_name
in (c
.column
for c
in idx
.columns
):
1090 self
._idl
.txn
._write
(self
, column
, datum
)
1091 for idx
in self
._table
.rows
.indexes
.values():
1092 # Only update the index if indexed columns change
1093 if column_name
in (c
.column
for c
in idx
.columns
):
1096 def addvalue(self
, column_name
, key
):
1097 self
._idl
.txn
._txn
_rows
[self
.uuid
] = self
1098 column
= self
._table
.columns
[column_name
]
1100 data
.Datum
.from_python(column
.type, key
, _row_to_uuid
)
1101 except error
.Error
as e
:
1103 vlog
.err("attempting to write bad value to column %s (%s)"
1106 inserts
= self
._mutations
.setdefault('_inserts', {})
1107 column_value
= inserts
.setdefault(column_name
, set())
1108 column_value
.add(key
)
1110 def delvalue(self
, column_name
, key
):
1111 self
._idl
.txn
._txn
_rows
[self
.uuid
] = self
1112 column
= self
._table
.columns
[column_name
]
1114 data
.Datum
.from_python(column
.type, key
, _row_to_uuid
)
1115 except error
.Error
as e
:
1117 vlog
.err("attempting to delete bad value from column %s (%s)"
1120 removes
= self
._mutations
.setdefault('_removes', {})
1121 column_value
= removes
.setdefault(column_name
, set())
1122 column_value
.add(key
)
1124 def setkey(self
, column_name
, key
, value
):
1125 self
._idl
.txn
._txn
_rows
[self
.uuid
] = self
1126 column
= self
._table
.columns
[column_name
]
1128 data
.Datum
.from_python(column
.type, {key
: value
}, _row_to_uuid
)
1129 except error
.Error
as e
:
1131 vlog
.err("attempting to write bad value to column %s (%s)"
1134 if self
._data
and column_name
in self
._data
:
1135 # Remove existing key/value before updating.
1136 removes
= self
._mutations
.setdefault('_removes', {})
1137 column_value
= removes
.setdefault(column_name
, set())
1138 column_value
.add(key
)
1139 inserts
= self
._mutations
.setdefault('_inserts', {})
1140 column_value
= inserts
.setdefault(column_name
, {})
1141 column_value
[key
] = value
1143 def delkey(self
, column_name
, key
, value
=None):
1144 self
._idl
.txn
._txn
_rows
[self
.uuid
] = self
1147 old_value
= data
.Datum
.to_python(self
._data
[column_name
],
1151 if key
not in old_value
:
1153 if old_value
[key
] != value
:
1155 removes
= self
._mutations
.setdefault('_removes', {})
1156 column_value
= removes
.setdefault(column_name
, set())
1157 column_value
.add(key
)
1161 def from_json(cls
, idl
, table
, uuid
, row_json
):
1163 for column_name
, datum_json
in six
.iteritems(row_json
):
1164 column
= table
.columns
.get(column_name
)
1167 vlog
.warn("unknown column %s in table %s"
1168 % (column_name
, table
.name
))
1171 datum
= ovs
.db
.data
.Datum
.from_json(column
.type, datum_json
)
1172 except error
.Error
as e
:
1174 vlog
.warn("error parsing column %s in table %s: %s"
1175 % (column_name
, table
.name
, e
))
1177 data
[column_name
] = datum
1178 return cls(idl
, table
, uuid
, data
)
1180 def verify(self
, column_name
):
1181 """Causes the original contents of column 'column_name' in this row to
1182 be verified as a prerequisite to completing the transaction. That is,
1183 if 'column_name' changed in this row (or if this row was deleted)
1184 between the time that the IDL originally read its contents and the time
1185 that the transaction commits, then the transaction aborts and
1186 Transaction.commit() returns Transaction.TRY_AGAIN.
1188 The intention is that, to ensure that no transaction commits based on
1189 dirty reads, an application should call Row.verify() on each data item
1190 read as part of a read-modify-write operation.
1192 In some cases Row.verify() reduces to a no-op, because the current
1193 value of the column is already known:
1195 - If this row is a row created by the current transaction (returned
1196 by Transaction.insert()).
1198 - If the column has already been modified within the current
1201 Because of the latter property, always call Row.verify() *before*
1202 modifying the column, for a given read-modify-write.
1204 A transaction must be in progress."""
1205 assert self
._idl
.txn
1206 assert self
._changes
is not None
1207 if not self
._data
or column_name
in self
._changes
:
1210 self
._prereqs
[column_name
] = None
1213 """Deletes this row from its table.
1215 A transaction must be in progress."""
1216 assert self
._idl
.txn
1217 assert self
._changes
is not None
1218 if self
._data
is None:
1219 del self
._idl
.txn
._txn
_rows
[self
.uuid
]
1221 self
._idl
.txn
._txn
_rows
[self
.uuid
] = self
1222 del self
._table
.rows
[self
.uuid
]
1223 self
.__dict
__["_changes"] = None
1225 def fetch(self
, column_name
):
1226 self
._idl
.txn
._fetch
(self
, column_name
)
1228 def increment(self
, column_name
):
1229 """Causes the transaction, when committed, to increment the value of
1230 'column_name' within this row by 1. 'column_name' must have an integer
1231 type. After the transaction commits successfully, the client may
1232 retrieve the final (incremented) value of 'column_name' with
1233 Transaction.get_increment_new_value().
1235 The client could accomplish something similar by reading and writing
1236 and verify()ing columns. However, increment() will never (by itself)
1237 cause a transaction to fail because of a verify error.
1239 The intended use is for incrementing the "next_cfg" column in
1240 the Open_vSwitch table."""
1241 self
._idl
.txn
._increment
(self
, column_name
)
1244 def _uuid_name_from_uuid(uuid
):
1245 return "row%s" % str(uuid
).replace("-", "_")
1248 def _where_uuid_equals(uuid
):
1249 return [["_uuid", "==", ["uuid", str(uuid
)]]]
1252 class _InsertedRow(object):
1253 def __init__(self
, op_index
):
1254 self
.op_index
= op_index
1258 class Transaction(object):
1259 """A transaction may modify the contents of a database by modifying the
1260 values of columns, deleting rows, inserting rows, or adding checks that
1261 columns in the database have not changed ("verify" operations), through
1264 Reading and writing columns and inserting and deleting rows are all
1265 straightforward. The reasons to verify columns are less obvious.
1266 Verification is the key to maintaining transactional integrity. Because
1267 OVSDB handles multiple clients, it can happen that between the time that
1268 OVSDB client A reads a column and writes a new value, OVSDB client B has
1269 written that column. Client A's write should not ordinarily overwrite
1270 client B's, especially if the column in question is a "map" column that
1271 contains several more or less independent data items. If client A adds a
1272 "verify" operation before it writes the column, then the transaction fails
1273 in case client B modifies it first. Client A will then see the new value
1274 of the column and compose a new transaction based on the new contents
1275 written by client B.
1277 When a transaction is complete, which must be before the next call to
1278 Idl.run(), call Transaction.commit() or Transaction.abort().
1280 The life-cycle of a transaction looks like this:
1282 1. Create the transaction and record the initial sequence number:
1284 seqno = idl.change_seqno(idl)
1285 txn = Transaction(idl)
1287 2. Modify the database with Row and Transaction methods.
1289 3. Commit the transaction by calling Transaction.commit(). The first call
1290 to this function probably returns Transaction.INCOMPLETE. The client
1291 must keep calling again along as this remains true, calling Idl.run() in
1292 between to let the IDL do protocol processing. (If the client doesn't
1293 have anything else to do in the meantime, it can use
1294 Transaction.commit_block() to avoid having to loop itself.)
1296 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
1297 to change from the saved 'seqno' (it's possible that it's already
1298 changed, in which case the client should not wait at all), then start
1299 over from step 1. Only a call to Idl.run() will change the return value
1300 of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)"""
1302 # Status values that Transaction.commit() can return.
1304 # Not yet committed or aborted.
1305 UNCOMMITTED
= "uncommitted"
1306 # Transaction didn't include any changes.
1307 UNCHANGED
= "unchanged"
1308 # Commit in progress, please wait.
1309 INCOMPLETE
= "incomplete"
1310 # ovsdb_idl_txn_abort() called.
1312 # Commit successful.
1314 # Commit failed because a "verify" operation
1315 # reported an inconsistency, due to a network
1316 # problem, or other transient failure. Wait
1317 # for a change, then try again.
1318 TRY_AGAIN
= "try again"
1319 # Server hasn't given us the lock yet.
1320 NOT_LOCKED
= "not locked"
1321 # Commit failed due to a hard error.
1325 def status_to_string(status
):
1326 """Converts one of the status values that Transaction.commit() can
1327 return into a human-readable string.
1329 (The status values are in fact such strings already, so
1330 there's nothing to do.)"""
1333 def __init__(self
, idl
):
1334 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
1335 A given Idl may only have a single active transaction at a time.
1337 A Transaction may modify the contents of a database by assigning new
1338 values to columns (attributes of Row), deleting rows (with
1339 Row.delete()), or inserting rows (with Transaction.insert()). It may
1340 also check that columns in the database have not changed with
1343 When a transaction is complete (which must be before the next call to
1344 Idl.run()), call Transaction.commit() or Transaction.abort()."""
1345 assert idl
.txn
is None
1348 self
._request
_id
= None
1350 self
.dry_run
= False
1352 self
._status
= Transaction
.UNCOMMITTED
1356 self
._inc
_row
= None
1357 self
._inc
_column
= None
1359 self
._fetch
_requests
= []
1361 self
._inserted
_rows
= {} # Map from UUID to _InsertedRow
1363 def add_comment(self
, comment
):
1364 """Appends 'comment' to the comments that will be passed to the OVSDB
1365 server when this transaction is committed. (The comment will be
1366 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
1367 relatively human-readable form.)"""
1368 self
._comments
.append(comment
)
1370 def wait(self
, poller
):
1371 """Causes poll_block() to wake up if this transaction has completed
1373 if self
._status
not in (Transaction
.UNCOMMITTED
,
1374 Transaction
.INCOMPLETE
):
1375 poller
.immediate_wake()
1377 def _substitute_uuids(self
, json
):
1378 if isinstance(json
, (list, tuple)):
1380 and json
[0] == 'uuid'
1381 and ovs
.ovsuuid
.is_valid_string(json
[1])):
1382 uuid
= ovs
.ovsuuid
.from_string(json
[1])
1383 row
= self
._txn
_rows
.get(uuid
, None)
1384 if row
and row
._data
is None:
1385 return ["named-uuid", _uuid_name_from_uuid(uuid
)]
1387 return [self
._substitute
_uuids
(elem
) for elem
in json
]
1390 def __disassemble(self
):
1393 for row
in six
.itervalues(self
._txn
_rows
):
1394 if row
._changes
is None:
1395 # If we add the deleted row back to rows with _changes == None
1396 # then __getattr__ will not work for the indexes
1397 row
.__dict
__["_changes"] = {}
1398 row
.__dict
__["_mutations"] = {}
1399 row
._table
.rows
[row
.uuid
] = row
1400 elif row
._data
is None:
1401 del row
._table
.rows
[row
.uuid
]
1402 row
.__dict
__["_changes"] = {}
1403 row
.__dict
__["_mutations"] = {}
1404 row
.__dict
__["_prereqs"] = {}
1408 """Attempts to commit 'txn'. Returns the status of the commit
1409 operation, one of the following constants:
1411 Transaction.INCOMPLETE:
1413 The transaction is in progress, but not yet complete. The caller
1414 should call again later, after calling Idl.run() to let the
1415 IDL do OVSDB protocol processing.
1417 Transaction.UNCHANGED:
1419 The transaction is complete. (It didn't actually change the
1420 database, so the IDL didn't send any request to the database
1423 Transaction.ABORTED:
1425 The caller previously called Transaction.abort().
1427 Transaction.SUCCESS:
1429 The transaction was successful. The update made by the
1430 transaction (and possibly other changes made by other database
1431 clients) should already be visible in the IDL.
1433 Transaction.TRY_AGAIN:
1435 The transaction failed for some transient reason, e.g. because a
1436 "verify" operation reported an inconsistency or due to a network
1437 problem. The caller should wait for a change to the database,
1438 then compose a new transaction, and commit the new transaction.
1440 Use Idl.change_seqno to wait for a change in the database. It is
1441 important to use its value *before* the initial call to
1442 Transaction.commit() as the baseline for this purpose, because
1443 the change that one should wait for can happen after the initial
1444 call but before the call that returns Transaction.TRY_AGAIN, and
1445 using some other baseline value in that situation could cause an
1446 indefinite wait if the database rarely changes.
1448 Transaction.NOT_LOCKED:
1450 The transaction failed because the IDL has been configured to
1451 require a database lock (with Idl.set_lock()) but didn't
1452 get it yet or has already lost it.
1454 Committing a transaction rolls back all of the changes that it made to
1455 the IDL's copy of the database. If the transaction commits
1456 successfully, then the database server will send an update and, thus,
1457 the IDL will be updated with the committed changes."""
1458 # The status can only change if we're the active transaction.
1459 # (Otherwise, our status will change only in Idl.run().)
1460 if self
!= self
.idl
.txn
:
1463 # If we need a lock but don't have it, give up quickly.
1464 if self
.idl
.lock_name
and not self
.idl
.has_lock
:
1465 self
._status
= Transaction
.NOT_LOCKED
1466 self
.__disassemble
()
1469 operations
= [self
.idl
._db
.name
]
1471 # Assert that we have the required lock (avoiding a race).
1472 if self
.idl
.lock_name
:
1473 operations
.append({"op": "assert",
1474 "lock": self
.idl
.lock_name
})
1476 # Add prerequisites and declarations of new rows.
1477 for row
in six
.itervalues(self
._txn
_rows
):
1481 for column_name
in row
._prereqs
:
1482 columns
.append(column_name
)
1483 rows
[column_name
] = row
._data
[column_name
].to_json()
1484 operations
.append({"op": "wait",
1485 "table": row
._table
.name
,
1487 "where": _where_uuid_equals(row
.uuid
),
1494 for row
in six
.itervalues(self
._txn
_rows
):
1495 if row
._changes
is None:
1496 if row
._table
.is_root
:
1497 operations
.append({"op": "delete",
1498 "table": row
._table
.name
,
1499 "where": _where_uuid_equals(row
.uuid
)})
1502 # Let ovsdb-server decide whether to really delete it.
1505 op
= {"table": row
._table
.name
}
1506 if row
._data
is None:
1508 op
["uuid-name"] = _uuid_name_from_uuid(row
.uuid
)
1511 op_index
= len(operations
) - 1
1512 self
._inserted
_rows
[row
.uuid
] = _InsertedRow(op_index
)
1515 op
["where"] = _where_uuid_equals(row
.uuid
)
1518 op
["row"] = row_json
1520 for column_name
, datum
in six
.iteritems(row
._changes
):
1521 if row
._data
is not None or not datum
.is_default():
1522 row_json
[column_name
] = (
1523 self
._substitute
_uuids
(datum
.to_json()))
1525 # If anything really changed, consider it an update.
1526 # We can't suppress not-really-changed values earlier
1527 # or transactions would become nonatomic (see the big
1528 # comment inside Transaction._write()).
1529 if (not any_updates
and row
._data
is not None and
1530 row
._data
[column_name
] != datum
):
1533 if row
._data
is None or row_json
:
1534 operations
.append(op
)
1537 op
= {"table": row
._table
.name
}
1539 if row
._data
is None:
1541 op
["where"] = self
._substitute
_uuids
(
1542 _where_uuid_equals(row
.uuid
))
1545 op
["where"] = _where_uuid_equals(row
.uuid
)
1546 op
["mutations"] = []
1547 if '_removes' in row
._mutations
.keys():
1548 for col
, dat
in six
.iteritems(row
._mutations
['_removes']):
1549 column
= row
._table
.columns
[col
]
1550 if column
.type.is_map():
1552 opdat
.append(list(dat
))
1558 datum
= data
.Datum
.from_python(column
.type,
1563 self
._substitute
_uuids
(datum
.to_json()))
1564 opdat
.append(inner_opdat
)
1565 mutation
= [col
, "delete", opdat
]
1566 op
["mutations"].append(mutation
)
1568 if '_inserts' in row
._mutations
.keys():
1569 for col
, val
in six
.iteritems(row
._mutations
['_inserts']):
1570 column
= row
._table
.columns
[col
]
1571 if column
.type.is_map():
1573 datum
= data
.Datum
.from_python(column
.type, val
,
1575 opdat
.append(datum
.as_list())
1581 datum
= data
.Datum
.from_python(column
.type,
1586 self
._substitute
_uuids
(datum
.to_json()))
1587 opdat
.append(inner_opdat
)
1588 mutation
= [col
, "insert", opdat
]
1589 op
["mutations"].append(mutation
)
1592 operations
.append(op
)
1595 if self
._fetch
_requests
:
1596 for fetch
in self
._fetch
_requests
:
1597 fetch
["index"] = len(operations
) - 1
1598 operations
.append({"op": "select",
1599 "table": fetch
["row"]._table
.name
,
1600 "where": self
._substitute
_uuids
(
1601 _where_uuid_equals(fetch
["row"].uuid
)),
1602 "columns": [fetch
["column_name"]]})
1606 if self
._inc
_row
and any_updates
:
1607 self
._inc
_index
= len(operations
) - 1
1609 operations
.append({"op": "mutate",
1610 "table": self
._inc
_row
._table
.name
,
1611 "where": self
._substitute
_uuids
(
1612 _where_uuid_equals(self
._inc
_row
.uuid
)),
1613 "mutations": [[self
._inc
_column
, "+=", 1]]})
1614 operations
.append({"op": "select",
1615 "table": self
._inc
_row
._table
.name
,
1616 "where": self
._substitute
_uuids
(
1617 _where_uuid_equals(self
._inc
_row
.uuid
)),
1618 "columns": [self
._inc
_column
]})
1622 operations
.append({"op": "comment",
1623 "comment": "\n".join(self
._comments
)})
1627 operations
.append({"op": "abort"})
1630 self
._status
= Transaction
.UNCHANGED
1632 msg
= ovs
.jsonrpc
.Message
.create_request("transact", operations
)
1633 self
._request
_id
= msg
.id
1634 if not self
.idl
._session
.send(msg
):
1635 self
.idl
._outstanding
_txns
[self
._request
_id
] = self
1636 self
._status
= Transaction
.INCOMPLETE
1638 self
._status
= Transaction
.TRY_AGAIN
1640 self
.__disassemble
()
1643 def commit_block(self
):
1644 """Attempts to commit this transaction, blocking until the commit
1645 either succeeds or fails. Returns the final commit status, which may
1646 be any Transaction.* value other than Transaction.INCOMPLETE.
1648 This function calls Idl.run() on this transaction'ss IDL, so it may
1649 cause Idl.change_seqno to change."""
1651 status
= self
.commit()
1652 if status
!= Transaction
.INCOMPLETE
:
1657 poller
= ovs
.poller
.Poller()
1658 self
.idl
.wait(poller
)
1662 def get_increment_new_value(self
):
1663 """Returns the final (incremented) value of the column in this
1664 transaction that was set to be incremented by Row.increment. This
1665 transaction must have committed successfully."""
1666 assert self
._status
== Transaction
.SUCCESS
1667 return self
._inc
_new
_value
1670 """Aborts this transaction. If Transaction.commit() has already been
1671 called then the transaction might get committed anyhow."""
1672 self
.__disassemble
()
1673 if self
._status
in (Transaction
.UNCOMMITTED
,
1674 Transaction
.INCOMPLETE
):
1675 self
._status
= Transaction
.ABORTED
1677 def get_error(self
):
1678 """Returns a string representing this transaction's current status,
1679 suitable for use in log messages."""
1680 if self
._status
!= Transaction
.ERROR
:
1681 return Transaction
.status_to_string(self
._status
)
1685 return "no error details available"
1687 def __set_error_json(self
, json
):
1688 if self
._error
is None:
1689 self
._error
= ovs
.json
.to_string(json
)
1691 def get_insert_uuid(self
, uuid
):
1692 """Finds and returns the permanent UUID that the database assigned to a
1693 newly inserted row, given the UUID that Transaction.insert() assigned
1694 locally to that row.
1696 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1697 or if it was assigned by that function and then deleted by Row.delete()
1698 within the same transaction. (Rows that are inserted and then deleted
1699 within a single transaction are never sent to the database server, so
1700 it never assigns them a permanent UUID.)
1702 This transaction must have completed successfully."""
1703 assert self
._status
in (Transaction
.SUCCESS
,
1704 Transaction
.UNCHANGED
)
1705 inserted_row
= self
._inserted
_rows
.get(uuid
)
1707 return inserted_row
.real
1710 def _increment(self
, row
, column
):
1711 assert not self
._inc
_row
1713 self
._inc
_column
= column
1715 def _fetch(self
, row
, column_name
):
1716 self
._fetch
_requests
.append({"row": row
, "column_name": column_name
})
1718 def _write(self
, row
, column
, datum
):
1719 assert row
._changes
is not None
1720 assert row
._mutations
is not None
1724 # If this is a write-only column and the datum being written is the
1725 # same as the one already there, just skip the update entirely. This
1726 # is worth optimizing because we have a lot of columns that get
1727 # periodically refreshed into the database but don't actually change
1730 # We don't do this for read/write columns because that would break
1731 # atomicity of transactions--some other client might have written a
1732 # different value in that column since we read it. (But if a whole
1733 # transaction only does writes of existing values, without making any
1734 # real changes, we will drop the whole transaction later in
1735 # ovsdb_idl_txn_commit().)
1736 if (not column
.alert
and row
._data
and
1737 row
._data
.get(column
.name
) == datum
):
1738 new_value
= row
._changes
.get(column
.name
)
1739 if new_value
is None or new_value
== datum
:
1742 txn
._txn
_rows
[row
.uuid
] = row
1743 if '_inserts' in row
._mutations
:
1744 row
._mutations
['_inserts'].pop(column
.name
, None)
1745 if '_removes' in row
._mutations
:
1746 row
._mutations
['_removes'].pop(column
.name
, None)
1747 row
._changes
[column
.name
] = datum
.copy()
1749 def insert(self
, table
, new_uuid
=None):
1750 """Inserts and returns a new row in 'table', which must be one of the
1751 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1753 The new row is assigned a provisional UUID. If 'uuid' is None then one
1754 is randomly generated; otherwise 'uuid' should specify a randomly
1755 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
1756 different UUID when 'txn' is committed, but the IDL will replace any
1757 uses of the provisional UUID in the data to be to be committed by the
1758 UUID assigned by ovsdb-server."""
1759 assert self
._status
== Transaction
.UNCOMMITTED
1760 if new_uuid
is None:
1761 new_uuid
= uuid
.uuid4()
1762 row
= Row(self
.idl
, table
, new_uuid
, None)
1763 table
.rows
[row
.uuid
] = row
1764 self
._txn
_rows
[row
.uuid
] = row
1767 def _process_reply(self
, msg
):
1768 if msg
.type == ovs
.jsonrpc
.Message
.T_ERROR
:
1769 self
._status
= Transaction
.ERROR
1770 elif not isinstance(msg
.result
, (list, tuple)):
1772 vlog
.warn('reply to "transact" is not JSON array')
1781 # This isn't an error in itself but indicates that some
1782 # prior operation failed, so make sure that we know about
1785 elif isinstance(op
, dict):
1786 error
= op
.get("error")
1787 if error
is not None:
1788 if error
== "timed out":
1790 elif error
== "not owner":
1792 elif error
== "aborted":
1796 self
.__set
_error
_json
(op
)
1799 self
.__set
_error
_json
(op
)
1801 vlog
.warn("operation reply is not JSON null or object")
1803 if not soft_errors
and not hard_errors
and not lock_errors
:
1804 if self
._inc
_row
and not self
.__process
_inc
_reply
(ops
):
1806 if self
._fetch
_requests
:
1807 if self
.__process
_fetch
_reply
(ops
):
1808 self
.idl
.change_seqno
+= 1
1812 for insert
in six
.itervalues(self
._inserted
_rows
):
1813 if not self
.__process
_insert
_reply
(insert
, ops
):
1817 self
._status
= Transaction
.ERROR
1819 self
._status
= Transaction
.NOT_LOCKED
1821 self
._status
= Transaction
.TRY_AGAIN
1823 self
._status
= Transaction
.SUCCESS
1826 def __check_json_type(json
, types
, name
):
1829 vlog
.warn("%s is missing" % name
)
1831 elif not isinstance(json
, tuple(types
)):
1833 vlog
.warn("%s has unexpected type %s" % (name
, type(json
)))
1838 def __process_fetch_reply(self
, ops
):
1840 for fetch_request
in self
._fetch
_requests
:
1841 row
= fetch_request
["row"]
1842 column_name
= fetch_request
["column_name"]
1843 index
= fetch_request
["index"]
1847 fetched_rows
= select
.get("rows")
1848 if not Transaction
.__check
_json
_type
(fetched_rows
, (list, tuple),
1849 '"select" reply "rows"'):
1851 if len(fetched_rows
) != 1:
1853 vlog
.warn('"select" reply "rows" has %d elements '
1854 'instead of 1' % len(fetched_rows
))
1856 fetched_row
= fetched_rows
[0]
1857 if not Transaction
.__check
_json
_type
(fetched_row
, (dict,),
1858 '"select" reply row'):
1861 column
= table
.columns
.get(column_name
)
1862 datum_json
= fetched_row
.get(column_name
)
1863 datum
= data
.Datum
.from_json(column
.type, datum_json
)
1865 row
._data
[column_name
] = datum
1870 def __process_inc_reply(self
, ops
):
1871 if self
._inc
_index
+ 2 > len(ops
):
1873 vlog
.warn("reply does not contain enough operations for "
1874 "increment (has %d, needs %d)" %
1875 (len(ops
), self
._inc
_index
+ 2))
1877 # We know that this is a JSON object because the loop in
1878 # __process_reply() already checked.
1879 mutate
= ops
[self
._inc
_index
]
1880 count
= mutate
.get("count")
1881 if not Transaction
.__check
_json
_type
(count
, six
.integer_types
,
1882 '"mutate" reply "count"'):
1886 vlog
.warn('"mutate" reply "count" is %d instead of 1' % count
)
1889 select
= ops
[self
._inc
_index
+ 1]
1890 rows
= select
.get("rows")
1891 if not Transaction
.__check
_json
_type
(rows
, (list, tuple),
1892 '"select" reply "rows"'):
1896 vlog
.warn('"select" reply "rows" has %d elements '
1897 'instead of 1' % len(rows
))
1900 if not Transaction
.__check
_json
_type
(row
, (dict,),
1901 '"select" reply row'):
1903 column
= row
.get(self
._inc
_column
)
1904 if not Transaction
.__check
_json
_type
(column
, six
.integer_types
,
1905 '"select" reply inc column'):
1907 self
._inc
_new
_value
= column
1910 def __process_insert_reply(self
, insert
, ops
):
1911 if insert
.op_index
>= len(ops
):
1913 vlog
.warn("reply does not contain enough operations "
1914 "for insert (has %d, needs %d)"
1915 % (len(ops
), insert
.op_index
))
1918 # We know that this is a JSON object because the loop in
1919 # __process_reply() already checked.
1920 reply
= ops
[insert
.op_index
]
1921 json_uuid
= reply
.get("uuid")
1922 if not Transaction
.__check
_json
_type
(json_uuid
, (tuple, list),
1923 '"insert" reply "uuid"'):
1927 uuid_
= ovs
.ovsuuid
.from_json(json_uuid
)
1930 vlog
.warn('"insert" reply "uuid" is not a JSON UUID')
1937 class SchemaHelper(object):
1938 """IDL Schema helper.
1940 This class encapsulates the logic required to generate schemas suitable
1941 for creating 'ovs.db.idl.Idl' objects. Clients should register columns
1942 they are interested in using register_columns(). When finished, the
1943 get_idl_schema() function may be called.
1945 The location on disk of the schema used may be found in the
1946 'schema_location' variable."""
1948 def __init__(self
, location
=None, schema_json
=None):
1949 """Creates a new Schema object.
1951 'location' file path to ovs schema. None means default location
1952 'schema_json' schema in json preresentation in memory
1955 if location
and schema_json
:
1956 raise ValueError("both location and schema_json can't be "
1957 "specified. it's ambiguous.")
1958 if schema_json
is None:
1959 if location
is None:
1960 location
= "%s/vswitch.ovsschema" % ovs
.dirs
.PKGDATADIR
1961 schema_json
= ovs
.json
.from_file(location
)
1963 self
.schema_json
= schema_json
1968 def register_columns(self
, table
, columns
, readonly
=[]):
1969 """Registers interest in the given 'columns' of 'table'. Future calls
1970 to get_idl_schema() will include 'table':column for each column in
1971 'columns'. This function automatically avoids adding duplicate entries
1973 A subset of 'columns' can be specified as 'readonly'. The readonly
1974 columns are not replicated but can be fetched on-demand by the user
1977 'table' must be a string.
1978 'columns' must be a list of strings.
1979 'readonly' must be a list of strings.
1982 assert isinstance(table
, six
.string_types
)
1983 assert isinstance(columns
, list)
1985 columns
= set(columns
) | self
._tables
.get(table
, set())
1986 self
._tables
[table
] = columns
1987 self
._readonly
[table
] = readonly
1989 def register_table(self
, table
):
1990 """Registers interest in the given all columns of 'table'. Future calls
1991 to get_idl_schema() will include all columns of 'table'.
1993 'table' must be a string
1995 assert isinstance(table
, six
.string_types
)
1996 self
._tables
[table
] = set() # empty set means all columns in the table
1998 def register_all(self
):
1999 """Registers interest in every column of every table."""
2002 def get_idl_schema(self
):
2003 """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
2004 object based on columns registered using the register_columns()
2007 schema
= ovs
.db
.schema
.DbSchema
.from_json(self
.schema_json
)
2008 self
.schema_json
= None
2012 for table
, columns
in six
.iteritems(self
._tables
):
2013 schema_tables
[table
] = (
2014 self
._keep
_table
_columns
(schema
, table
, columns
))
2016 schema
.tables
= schema_tables
2017 schema
.readonly
= self
._readonly
2020 def _keep_table_columns(self
, schema
, table_name
, columns
):
2021 assert table_name
in schema
.tables
2022 table
= schema
.tables
[table_name
]
2025 # empty set means all columns in the table
2029 for column_name
in columns
:
2030 assert isinstance(column_name
, six
.string_types
)
2031 assert column_name
in table
.columns
2033 new_columns
[column_name
] = table
.columns
[column_name
]
2035 table
.columns
= new_columns