1 # Copyright (c) 2009, 2010, 2011, 2012, 2013, 2016 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 import ovs
.db
.data
as data
25 from ovs
.db
import custom_index
26 from ovs
.db
import error
28 vlog
= ovs
.vlog
.Vlog("idl")
30 __pychecker__
= 'no-classattr no-objattrs'
39 CLUSTERED
= "clustered"
43 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
45 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
46 requests to an OVSDB database server and parses the responses, converting
47 raw JSON into data structures that are easier for clients to digest.
49 The IDL also assists with issuing database transactions. The client
50 creates a transaction, manipulates the IDL data structures, and commits or
51 aborts the transaction. The IDL then composes and issues the necessary
52 JSON-RPC requests and reports to the client whether the transaction
53 completed successfully.
55 The client is allowed to access the following attributes directly, in a
58 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
59 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
60 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
63 The client may directly read and write the Row objects referenced by the
64 'rows' map values. Refer to Row for more details.
66 - 'change_seqno': A number that represents the IDL's state. When the IDL
67 is updated (by Idl.run()), its value changes. The sequence number can
68 occasionally change even if the database does not. This happens if the
69 connection to the database drops and reconnects, which causes the
70 database contents to be reloaded even if they didn't change. (It could
71 also happen if the database server sends out a "change" that reflects
72 what the IDL already thought was in the database. The database server is
73 not supposed to do that, but bugs could in theory cause it to do so.)
75 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
76 if no lock is configured.
78 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
79 lock, and False otherwise.
81 Locking and unlocking happens asynchronously from the database client's
82 point of view, so the information is only useful for optimization
83 (e.g. if the client doesn't have the lock then there's no point in trying
84 to write to the database).
86 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
87 the database server has indicated that some other client already owns the
88 requested lock, and False otherwise.
90 - 'txn': The ovs.db.idl.Transaction object for the database transaction
91 currently being constructed, if there is one, or None otherwise.
95 IDL_S_SERVER_SCHEMA_REQUESTED
= 1
96 IDL_S_SERVER_MONITOR_REQUESTED
= 2
97 IDL_S_DATA_MONITOR_REQUESTED
= 3
98 IDL_S_DATA_MONITOR_COND_REQUESTED
= 4
100 def __init__(self
, remote
, schema_helper
, probe_interval
=None,
102 """Creates and returns a connection to the database named 'db_name' on
103 'remote', which should be in a form acceptable to
104 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
105 replica of the remote database.
107 'remote' can be comma separated multiple remotes and each remote
108 should be in a form acceptable to ovs.jsonrpc.session.open().
110 'schema_helper' should be an instance of the SchemaHelper class which
111 generates schema for the remote database. The caller may have cut it
112 down by removing tables or columns that are not of interest. The IDL
113 will only replicate the tables and columns that remain. The caller may
114 also add an attribute named 'alert' to selected remaining columns,
115 setting its value to False; if so, then changes to those columns will
116 not be considered changes to the database for the purpose of the return
117 value of Idl.run() and Idl.change_seqno. This is useful for columns
118 that the IDL's client will write but not read.
120 As a convenience to users, 'schema' may also be an instance of the
123 The IDL uses and modifies 'schema' directly.
125 If 'leader_only' is set to True (default value) the IDL will only
126 monitor and transact with the leader of the cluster.
128 If "probe_interval" is zero it disables the connection keepalive
129 feature. If non-zero the value will be forced to at least 1000
130 milliseconds. If None it will just use the default value in OVS.
133 assert isinstance(schema_helper
, SchemaHelper
)
134 schema
= schema_helper
.get_idl_schema()
136 self
.tables
= schema
.tables
137 self
.readonly
= schema
.readonly
139 remotes
= self
._parse
_remotes
(remote
)
140 self
._session
= ovs
.jsonrpc
.Session
.open_multiple(remotes
,
141 probe_interval
=probe_interval
)
142 self
._monitor
_request
_id
= None
143 self
._last
_seqno
= None
144 self
.change_seqno
= 0
145 self
.uuid
= uuid
.uuid1()
148 self
._server
_schema
_request
_id
= None
149 self
._server
_monitor
_request
_id
= None
150 self
._db
_change
_aware
_request
_id
= None
151 self
._server
_db
_name
= '_Server'
152 self
._server
_db
_table
= 'Database'
153 self
.server_tables
= None
154 self
._server
_db
= None
155 self
.server_monitor_uuid
= uuid
.uuid1()
156 self
.leader_only
= leader_only
157 self
.cluster_id
= None
160 self
.state
= self
.IDL_S_INITIAL
163 self
.lock_name
= None # Name of lock we need, None if none.
164 self
.has_lock
= False # Has db server said we have the lock?
165 self
.is_lock_contended
= False # Has db server said we can't get lock?
166 self
._lock
_request
_id
= None # JSON-RPC ID of in-flight lock request.
168 # Transaction support.
170 self
._outstanding
_txns
= {}
172 for table
in schema
.tables
.values():
173 for column
in table
.columns
.values():
174 if not hasattr(column
, 'alert'):
176 table
.need_table
= False
177 table
.rows
= custom_index
.IndexedRows(table
)
179 table
.condition
= [True]
180 table
.cond_changed
= False
182 def _parse_remotes(self
, remote
):
184 # "tcp:10.0.0.1:6641,unix:/tmp/db.sock,t,s,tcp:10.0.0.2:6642"
185 # this function returns
186 # ["tcp:10.0.0.1:6641", "unix:/tmp/db.sock,t,s", tcp:10.0.0.2:6642"]
188 for r
in remote
.split(','):
189 if remotes
and r
.find(":") == -1:
190 remotes
[-1] += "," + r
195 def set_cluster_id(self
, cluster_id
):
196 """Set the id of the cluster that this idl must connect to."""
197 self
.cluster_id
= cluster_id
198 if self
.state
!= self
.IDL_S_INITIAL
:
199 self
.force_reconnect()
201 def index_create(self
, table
, name
):
202 """Create a named multi-column index on a table"""
203 return self
.tables
[table
].rows
.index_create(name
)
205 def index_irange(self
, table
, name
, start
, end
):
206 """Return items in a named index between start/end inclusive"""
207 return self
.tables
[table
].rows
.indexes
[name
].irange(start
, end
)
209 def index_equal(self
, table
, name
, value
):
210 """Return items in a named index matching a value"""
211 return self
.tables
[table
].rows
.indexes
[name
].irange(value
, value
)
214 """Closes the connection to the database. The IDL will no longer
216 self
._session
.close()
219 """Processes a batch of messages from the database server. Returns
220 True if the database as seen through the IDL changed, False if it did
221 not change. The initial fetch of the entire contents of the remote
222 database is considered to be one kind of change. If the IDL has been
223 configured to acquire a database lock (with Idl.set_lock()), then
224 successfully acquiring the lock is also considered to be a change.
226 This function can return occasional false positives, that is, report
227 that the database changed even though it didn't. This happens if the
228 connection to the database drops and reconnects, which causes the
229 database contents to be reloaded even if they didn't change. (It could
230 also happen if the database server sends out a "change" that reflects
231 what we already thought was in the database, but the database server is
232 not supposed to do that.)
234 As an alternative to checking the return value, the client may check
235 for changes in self.change_seqno."""
237 initial_change_seqno
= self
.change_seqno
239 self
.send_cond_change()
244 if not self
._session
.is_connected():
247 seqno
= self
._session
.get_seqno()
248 if seqno
!= self
._last
_seqno
:
249 self
._last
_seqno
= seqno
250 self
.__txn
_abort
_all
()
251 self
.__send
_server
_schema
_request
()
253 self
.__send
_lock
_request
()
256 msg
= self
._session
.recv()
260 if (msg
.type == ovs
.jsonrpc
.Message
.T_NOTIFY
261 and msg
.method
== "update2"
262 and len(msg
.params
) == 2):
263 # Database contents changed.
264 self
.__parse
_update
(msg
.params
[1], OVSDB_UPDATE2
)
265 elif (msg
.type == ovs
.jsonrpc
.Message
.T_NOTIFY
266 and msg
.method
== "update"
267 and len(msg
.params
) == 2):
268 # Database contents changed.
269 if msg
.params
[0] == str(self
.server_monitor_uuid
):
270 self
.__parse
_update
(msg
.params
[1], OVSDB_UPDATE
,
271 tables
=self
.server_tables
)
272 self
.change_seqno
= initial_change_seqno
273 if not self
.__check
_server
_db
():
274 self
.force_reconnect()
277 self
.__parse
_update
(msg
.params
[1], OVSDB_UPDATE
)
278 elif (msg
.type == ovs
.jsonrpc
.Message
.T_REPLY
279 and self
._monitor
_request
_id
is not None
280 and self
._monitor
_request
_id
== msg
.id):
281 # Reply to our "monitor" request.
283 self
.change_seqno
+= 1
284 self
._monitor
_request
_id
= None
286 if self
.state
== self
.IDL_S_DATA_MONITOR_COND_REQUESTED
:
287 self
.__parse
_update
(msg
.result
, OVSDB_UPDATE2
)
289 assert self
.state
== self
.IDL_S_DATA_MONITOR_REQUESTED
290 self
.__parse
_update
(msg
.result
, OVSDB_UPDATE
)
292 except error
.Error
as e
:
293 vlog
.err("%s: parse error in received schema: %s"
294 % (self
._session
.get_name(), e
))
296 elif (msg
.type == ovs
.jsonrpc
.Message
.T_REPLY
297 and self
._server
_schema
_request
_id
is not None
298 and self
._server
_schema
_request
_id
== msg
.id):
299 # Reply to our "get_schema" of _Server request.
301 self
._server
_schema
_request
_id
= None
302 sh
= SchemaHelper(None, msg
.result
)
303 sh
.register_table(self
._server
_db
_table
)
304 schema
= sh
.get_idl_schema()
305 self
._server
_db
= schema
306 self
.server_tables
= schema
.tables
307 self
.__send
_server
_monitor
_request
()
308 except error
.Error
as e
:
309 vlog
.err("%s: error receiving server schema: %s"
310 % (self
._session
.get_name(), e
))
315 self
.change_seqno
= initial_change_seqno
316 self
.__send
_monitor
_request
()
317 elif (msg
.type == ovs
.jsonrpc
.Message
.T_REPLY
318 and self
._server
_monitor
_request
_id
is not None
319 and self
._server
_monitor
_request
_id
== msg
.id):
320 # Reply to our "monitor" of _Server request.
322 self
._server
_monitor
_request
_id
= None
323 self
.__parse
_update
(msg
.result
, OVSDB_UPDATE
,
324 tables
=self
.server_tables
)
325 self
.change_seqno
= initial_change_seqno
326 if self
.__check
_server
_db
():
327 self
.__send
_monitor
_request
()
328 self
.__send
_db
_change
_aware
()
330 self
.force_reconnect()
332 except error
.Error
as e
:
333 vlog
.err("%s: parse error in received schema: %s"
334 % (self
._session
.get_name(), e
))
339 self
.change_seqno
= initial_change_seqno
340 self
.__send
_monitor
_request
()
341 elif (msg
.type == ovs
.jsonrpc
.Message
.T_REPLY
342 and self
._db
_change
_aware
_request
_id
is not None
343 and self
._db
_change
_aware
_request
_id
== msg
.id):
344 # Reply to us notifying the server of our change awarness.
345 self
._db
_change
_aware
_request
_id
= None
346 elif (msg
.type == ovs
.jsonrpc
.Message
.T_REPLY
347 and self
._lock
_request
_id
is not None
348 and self
._lock
_request
_id
== msg
.id):
349 # Reply to our "lock" request.
350 self
.__parse
_lock
_reply
(msg
.result
)
351 elif (msg
.type == ovs
.jsonrpc
.Message
.T_NOTIFY
352 and msg
.method
== "locked"):
354 self
.__parse
_lock
_notify
(msg
.params
, True)
355 elif (msg
.type == ovs
.jsonrpc
.Message
.T_NOTIFY
356 and msg
.method
== "stolen"):
357 # Someone else stole our lock.
358 self
.__parse
_lock
_notify
(msg
.params
, False)
359 elif msg
.type == ovs
.jsonrpc
.Message
.T_NOTIFY
and msg
.id == "echo":
360 # Reply to our echo request. Ignore it.
362 elif (msg
.type == ovs
.jsonrpc
.Message
.T_ERROR
and
363 self
.state
== self
.IDL_S_DATA_MONITOR_COND_REQUESTED
and
364 self
._monitor
_request
_id
== msg
.id):
365 if msg
.error
== "unknown method":
366 self
.__send
_monitor
_request
()
367 elif (msg
.type == ovs
.jsonrpc
.Message
.T_ERROR
and
368 self
._server
_schema
_request
_id
is not None and
369 self
._server
_schema
_request
_id
== msg
.id):
370 self
._server
_schema
_request
_id
= None
372 self
.force_reconnect()
375 self
.change_seqno
= initial_change_seqno
376 self
.__send
_monitor
_request
()
377 elif (msg
.type in (ovs
.jsonrpc
.Message
.T_ERROR
,
378 ovs
.jsonrpc
.Message
.T_REPLY
)
379 and self
.__txn
_process
_reply
(msg
)):
380 # __txn_process_reply() did everything needed.
383 # This can happen if a transaction is destroyed before we
384 # receive the reply, so keep the log level low.
385 vlog
.dbg("%s: received unexpected %s message"
386 % (self
._session
.get_name(),
387 ovs
.jsonrpc
.Message
.type_to_string(msg
.type)))
389 return initial_change_seqno
!= self
.change_seqno
391 def send_cond_change(self
):
392 if not self
._session
.is_connected():
395 for table
in self
.tables
.values():
396 if table
.cond_changed
:
397 self
.__send
_cond
_change
(table
, table
.condition
)
398 table
.cond_changed
= False
400 def cond_change(self
, table_name
, cond
):
401 """Sets the condition for 'table_name' to 'cond', which should be a
402 conditional expression suitable for use directly in the OVSDB
403 protocol, with the exception that the empty condition []
404 matches no rows (instead of matching every row). That is, []
405 is equivalent to [False], not to [True].
408 table
= self
.tables
.get(table_name
)
410 raise error
.Error('Unknown table "%s"' % table_name
)
414 if table
.condition
!= cond
:
415 table
.condition
= cond
416 table
.cond_changed
= True
418 def wait(self
, poller
):
419 """Arranges for poller.block() to wake up when self.run() has something
420 to do or when activity occurs on a transaction on 'self'."""
421 self
._session
.wait(poller
)
422 self
._session
.recv_wait(poller
)
424 def has_ever_connected(self
):
425 """Returns True, if the IDL successfully connected to the remote
426 database and retrieved its contents (even if the connection
427 subsequently dropped and is in the process of reconnecting). If so,
428 then the IDL contains an atomic snapshot of the database's contents
429 (but it might be arbitrarily old if the connection dropped).
431 Returns False if the IDL has never connected or retrieved the
432 database's contents. If so, the IDL is empty."""
433 return self
.change_seqno
!= 0
435 def force_reconnect(self
):
436 """Forces the IDL to drop its connection to the database and reconnect.
437 In the meantime, the contents of the IDL will not change."""
438 self
._session
.force_reconnect()
440 def session_name(self
):
441 return self
._session
.get_name()
443 def set_lock(self
, lock_name
):
444 """If 'lock_name' is not None, configures the IDL to obtain the named
445 lock from the database server and to avoid modifying the database when
446 the lock cannot be acquired (that is, when another client has the same
449 If 'lock_name' is None, drops the locking requirement and releases the
452 assert not self
._outstanding
_txns
454 if self
.lock_name
and (not lock_name
or lock_name
!= self
.lock_name
):
455 # Release previous lock.
456 self
.__send
_unlock
_request
()
457 self
.lock_name
= None
458 self
.is_lock_contended
= False
460 if lock_name
and not self
.lock_name
:
462 self
.lock_name
= lock_name
463 self
.__send
_lock
_request
()
465 def notify(self
, event
, row
, updates
=None):
466 """Hook for implementing create/update/delete notifications
468 :param event: The event that was triggered
469 :type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE
470 :param row: The row as it is after the operation has occured
472 :param updates: For updates, row with only old values of the changed
477 def __send_cond_change(self
, table
, cond
):
478 monitor_cond_change
= {table
.name
: [{"where": cond
}]}
479 old_uuid
= str(self
.uuid
)
480 self
.uuid
= uuid
.uuid1()
481 params
= [old_uuid
, str(self
.uuid
), monitor_cond_change
]
482 msg
= ovs
.jsonrpc
.Message
.create_request("monitor_cond_change", params
)
483 self
._session
.send(msg
)
488 for table
in self
.tables
.values():
491 table
.rows
= custom_index
.IndexedRows(table
)
494 self
.change_seqno
+= 1
496 def __update_has_lock(self
, new_has_lock
):
497 if new_has_lock
and not self
.has_lock
:
498 if self
._monitor
_request
_id
is None:
499 self
.change_seqno
+= 1
501 # We're waiting for a monitor reply, so don't signal that the
502 # database changed. The monitor reply will increment
503 # change_seqno anyhow.
505 self
.is_lock_contended
= False
506 self
.has_lock
= new_has_lock
508 def __do_send_lock_request(self
, method
):
509 self
.__update
_has
_lock
(False)
510 self
._lock
_request
_id
= None
511 if self
._session
.is_connected():
512 msg
= ovs
.jsonrpc
.Message
.create_request(method
, [self
.lock_name
])
514 self
._session
.send(msg
)
519 def __send_lock_request(self
):
520 self
._lock
_request
_id
= self
.__do
_send
_lock
_request
("lock")
522 def __send_unlock_request(self
):
523 self
.__do
_send
_lock
_request
("unlock")
525 def __parse_lock_reply(self
, result
):
526 self
._lock
_request
_id
= None
527 got_lock
= isinstance(result
, dict) and result
.get("locked") is True
528 self
.__update
_has
_lock
(got_lock
)
530 self
.is_lock_contended
= True
532 def __parse_lock_notify(self
, params
, new_has_lock
):
533 if (self
.lock_name
is not None
534 and isinstance(params
, (list, tuple))
536 and params
[0] == self
.lock_name
):
537 self
.__update
_has
_lock
(new_has_lock
)
539 self
.is_lock_contended
= True
541 def __send_db_change_aware(self
):
542 msg
= ovs
.jsonrpc
.Message
.create_request("set_db_change_aware",
544 self
._db
_change
_aware
_request
_id
= msg
.id
545 self
._session
.send(msg
)
547 def __send_monitor_request(self
):
548 if (self
.state
in [self
.IDL_S_SERVER_MONITOR_REQUESTED
,
549 self
.IDL_S_INITIAL
]):
550 self
.state
= self
.IDL_S_DATA_MONITOR_COND_REQUESTED
551 method
= "monitor_cond"
553 self
.state
= self
.IDL_S_DATA_MONITOR_REQUESTED
556 monitor_requests
= {}
557 for table
in self
.tables
.values():
559 for column
in table
.columns
.keys():
560 if ((table
.name
not in self
.readonly
) or
561 (table
.name
in self
.readonly
) and
562 (column
not in self
.readonly
[table
.name
])):
563 columns
.append(column
)
564 monitor_request
= {"columns": columns
}
565 if method
== "monitor_cond" and table
.condition
!= [True]:
566 monitor_request
["where"] = table
.condition
567 table
.cond_change
= False
568 monitor_requests
[table
.name
] = [monitor_request
]
570 msg
= ovs
.jsonrpc
.Message
.create_request(
571 method
, [self
._db
.name
, str(self
.uuid
), monitor_requests
])
572 self
._monitor
_request
_id
= msg
.id
573 self
._session
.send(msg
)
575 def __send_server_schema_request(self
):
576 self
.state
= self
.IDL_S_SERVER_SCHEMA_REQUESTED
577 msg
= ovs
.jsonrpc
.Message
.create_request(
578 "get_schema", [self
._server
_db
_name
, str(self
.uuid
)])
579 self
._server
_schema
_request
_id
= msg
.id
580 self
._session
.send(msg
)
582 def __send_server_monitor_request(self
):
583 self
.state
= self
.IDL_S_SERVER_MONITOR_REQUESTED
584 monitor_requests
= {}
585 table
= self
.server_tables
[self
._server
_db
_table
]
586 columns
= [column
for column
in table
.columns
.keys()]
587 for column
in table
.columns
.values():
588 if not hasattr(column
, 'alert'):
590 table
.rows
= custom_index
.IndexedRows(table
)
591 table
.need_table
= False
593 monitor_request
= {"columns": columns
}
594 monitor_requests
[table
.name
] = [monitor_request
]
595 msg
= ovs
.jsonrpc
.Message
.create_request(
596 'monitor', [self
._server
_db
.name
,
597 str(self
.server_monitor_uuid
),
599 self
._server
_monitor
_request
_id
= msg
.id
600 self
._session
.send(msg
)
602 def __parse_update(self
, update
, version
, tables
=None):
605 self
.__do
_parse
_update
(update
, version
, self
.tables
)
607 self
.__do
_parse
_update
(update
, version
, tables
)
608 except error
.Error
as e
:
609 vlog
.err("%s: error parsing update: %s"
610 % (self
._session
.get_name(), e
))
612 def __do_parse_update(self
, table_updates
, version
, tables
):
613 if not isinstance(table_updates
, dict):
614 raise error
.Error("<table-updates> is not an object",
617 for table_name
, table_update
in table_updates
.items():
618 table
= tables
.get(table_name
)
620 raise error
.Error('<table-updates> includes unknown '
621 'table "%s"' % table_name
)
623 if not isinstance(table_update
, dict):
624 raise error
.Error('<table-update> for table "%s" is not '
625 'an object' % table_name
, table_update
)
627 for uuid_string
, row_update
in table_update
.items():
628 if not ovs
.ovsuuid
.is_valid_string(uuid_string
):
629 raise error
.Error('<table-update> for table "%s" '
630 'contains bad UUID "%s" as member '
631 'name' % (table_name
, uuid_string
),
633 uuid
= ovs
.ovsuuid
.from_string(uuid_string
)
635 if not isinstance(row_update
, dict):
636 raise error
.Error('<table-update> for table "%s" '
637 'contains <row-update> for %s that '
639 % (table_name
, uuid_string
))
641 if version
== OVSDB_UPDATE2
:
642 if self
.__process
_update
2(table
, uuid
, row_update
):
643 self
.change_seqno
+= 1
646 parser
= ovs
.db
.parser
.Parser(row_update
, "row-update")
647 old
= parser
.get_optional("old", [dict])
648 new
= parser
.get_optional("new", [dict])
651 if not old
and not new
:
652 raise error
.Error('<row-update> missing "old" and '
653 '"new" members', row_update
)
655 if self
.__process
_update
(table
, uuid
, old
, new
):
656 self
.change_seqno
+= 1
658 def __process_update2(self
, table
, uuid
, row_update
):
659 row
= table
.rows
.get(uuid
)
661 if "delete" in row_update
:
664 self
.notify(ROW_DELETE
, row
)
668 vlog
.warn("cannot delete missing row %s from table"
669 "%s" % (uuid
, table
.name
))
670 elif "insert" in row_update
or "initial" in row_update
:
672 vlog
.warn("cannot add existing row %s from table"
673 " %s" % (uuid
, table
.name
))
675 row
= self
.__create
_row
(table
, uuid
)
676 if "insert" in row_update
:
677 row_update
= row_update
['insert']
679 row_update
= row_update
['initial']
680 self
.__add
_default
(table
, row_update
)
681 changed
= self
.__row
_update
(table
, row
, row_update
)
682 table
.rows
[uuid
] = row
684 self
.notify(ROW_CREATE
, row
)
685 elif "modify" in row_update
:
687 raise error
.Error('Modify non-existing row')
689 old_row
= self
.__apply
_diff
(table
, row
, row_update
['modify'])
690 self
.notify(ROW_UPDATE
, row
, Row(self
, table
, uuid
, old_row
))
693 raise error
.Error('<row-update> unknown operation',
697 def __process_update(self
, table
, uuid
, old
, new
):
698 """Returns True if a column changed, False otherwise."""
699 row
= table
.rows
.get(uuid
)
706 self
.notify(ROW_DELETE
, row
)
709 vlog
.warn("cannot delete missing row %s from table %s"
710 % (uuid
, table
.name
))
715 row
= self
.__create
_row
(table
, uuid
)
720 vlog
.warn("cannot add existing row %s to table %s"
721 % (uuid
, table
.name
))
722 changed |
= self
.__row
_update
(table
, row
, new
)
724 table
.rows
[uuid
] = row
726 self
.notify(ROW_CREATE
, row
)
730 row
= self
.__create
_row
(table
, uuid
)
734 vlog
.warn("cannot modify missing row %s in table %s"
735 % (uuid
, table
.name
))
736 changed |
= self
.__row
_update
(table
, row
, new
)
738 table
.rows
[uuid
] = row
740 self
.notify(op
, row
, Row
.from_json(self
, table
, uuid
, old
))
743 def __check_server_db(self
):
744 """Returns True if this is a valid server database, False otherwise."""
745 session_name
= self
.session_name()
747 if self
._server
_db
_table
not in self
.server_tables
:
748 vlog
.info("%s: server does not have %s table in its %s database"
749 % (session_name
, self
._server
_db
_table
,
750 self
._server
_db
_name
))
753 rows
= self
.server_tables
[self
._server
_db
_table
].rows
756 for row
in rows
.values():
758 if self
.cluster_id
in \
759 map(lambda x
: str(x
)[:4], row
.cid
):
762 elif row
.name
== self
._db
.name
:
767 vlog
.info("%s: server does not have %s database"
768 % (session_name
, self
._db
.name
))
771 if (database
.model
== CLUSTERED
and
772 self
._session
.get_num_of_remotes() > 1):
773 if not database
.schema
:
774 vlog
.info('%s: clustered database server has not yet joined '
775 'cluster; trying another server' % session_name
)
777 if not database
.connected
:
778 vlog
.info('%s: clustered database server is disconnected '
779 'from cluster; trying another server' % session_name
)
781 if (self
.leader_only
and
782 not database
.leader
):
783 vlog
.info('%s: clustered database server is not cluster '
784 'leader; trying another server' % session_name
)
787 if database
.index
[0] < self
._min
_index
:
788 vlog
.warn('%s: clustered database server has stale data; '
789 'trying another server' % session_name
)
791 self
._min
_index
= database
.index
[0]
795 def __column_name(self
, column
):
796 if column
.type.key
.type == ovs
.db
.types
.UuidType
:
797 return ovs
.ovsuuid
.to_json(column
.type.key
.type.default
)
799 return column
.type.key
.type.default
801 def __add_default(self
, table
, row_update
):
802 for column
in table
.columns
.values():
803 if column
.name
not in row_update
:
804 if ((table
.name
not in self
.readonly
) or
805 (table
.name
in self
.readonly
) and
806 (column
.name
not in self
.readonly
[table
.name
])):
807 if column
.type.n_min
!= 0 and not column
.type.is_map():
808 row_update
[column
.name
] = self
.__column
_name
(column
)
810 def __apply_diff(self
, table
, row
, row_diff
):
812 for column_name
, datum_diff_json
in row_diff
.items():
813 column
= table
.columns
.get(column_name
)
816 vlog
.warn("unknown column %s updating table %s"
817 % (column_name
, table
.name
))
821 datum_diff
= data
.Datum
.from_json(column
.type, datum_diff_json
)
822 except error
.Error
as e
:
824 vlog
.warn("error parsing column %s in table %s: %s"
825 % (column_name
, table
.name
, e
))
828 old_row
[column_name
] = row
._data
[column_name
].copy()
829 datum
= row
._data
[column_name
].diff(datum_diff
)
830 if datum
!= row
._data
[column_name
]:
831 row
._data
[column_name
] = datum
835 def __row_update(self
, table
, row
, row_json
):
837 for column_name
, datum_json
in row_json
.items():
838 column
= table
.columns
.get(column_name
)
841 vlog
.warn("unknown column %s updating table %s"
842 % (column_name
, table
.name
))
846 datum
= data
.Datum
.from_json(column
.type, datum_json
)
847 except error
.Error
as e
:
849 vlog
.warn("error parsing column %s in table %s: %s"
850 % (column_name
, table
.name
, e
))
853 if datum
!= row
._data
[column_name
]:
854 row
._data
[column_name
] = datum
858 # Didn't really change but the OVSDB monitor protocol always
859 # includes every value in a row.
863 def __create_row(self
, table
, uuid
):
865 for column
in table
.columns
.values():
866 data
[column
.name
] = ovs
.db
.data
.Datum
.default(column
.type)
867 return Row(self
, table
, uuid
, data
)
870 self
._session
.force_reconnect()
872 def __txn_abort_all(self
):
873 while self
._outstanding
_txns
:
874 txn
= self
._outstanding
_txns
.popitem()[1]
875 txn
._status
= Transaction
.TRY_AGAIN
877 def __txn_process_reply(self
, msg
):
878 txn
= self
._outstanding
_txns
.pop(msg
.id, None)
880 txn
._process
_reply
(msg
)
884 def _uuid_to_row(atom
, base
):
886 return base
.ref_table
.rows
.get(atom
)
891 def _row_to_uuid(value
):
892 if isinstance(value
, Row
):
898 @functools.total_ordering
900 """A row within an IDL.
902 The client may access the following attributes directly:
904 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
906 - An attribute for each column in the Row's table, named for the column,
907 whose values are as returned by Datum.to_python() for the column's type.
909 If some error occurs (e.g. the database server's idea of the column is
910 different from the IDL's idea), then the attribute values is the
911 "default" value return by Datum.default() for the column's type. (It is
912 important to know this because the default value may violate constraints
913 for the column's type, e.g. the default integer value is 0 even if column
914 contraints require the column's value to be positive.)
916 When a transaction is active, column attributes may also be assigned new
917 values. Committing the transaction will then cause the new value to be
918 stored into the database.
920 *NOTE*: In the current implementation, the value of a column is a *copy*
921 of the value in the database. This means that modifying its value
922 directly will have no useful effect. For example, the following:
923 row.mycolumn["a"] = "b" # don't do this
924 will not change anything in the database, even after commit. To modify
925 the column, instead assign the modified column value back to the column:
930 def __init__(self
, idl
, table
, uuid
, data
):
931 # All of the explicit references to self.__dict__ below are required
932 # to set real attributes with invoking self.__getattr__().
933 self
.__dict
__["uuid"] = uuid
935 self
.__dict
__["_idl"] = idl
936 self
.__dict
__["_table"] = table
938 # _data is the committed data. It takes the following values:
940 # - A dictionary that maps every column name to a Datum, if the row
941 # exists in the committed form of the database.
943 # - None, if this row is newly inserted within the active transaction
944 # and thus has no committed form.
945 self
.__dict
__["_data"] = data
947 # _changes describes changes to this row within the active transaction.
948 # It takes the following values:
950 # - {}, the empty dictionary, if no transaction is active or if the
951 # row has yet not been changed within this transaction.
953 # - A dictionary that maps a column name to its new Datum, if an
954 # active transaction changes those columns' values.
956 # - A dictionary that maps every column name to a Datum, if the row
957 # is newly inserted within the active transaction.
959 # - None, if this transaction deletes this row.
960 self
.__dict
__["_changes"] = {}
962 # _mutations describes changes to this row to be handled via a
963 # mutate operation on the wire. It takes the following values:
965 # - {}, the empty dictionary, if no transaction is active or if the
966 # row has yet not been mutated within this transaction.
968 # - A dictionary that contains two keys:
970 # - "_inserts" contains a dictionary that maps column names to
971 # new keys/key-value pairs that should be inserted into the
973 # - "_removes" contains a dictionary that maps column names to
974 # the keys/key-value pairs that should be removed from the
977 # - None, if this transaction deletes this row.
978 self
.__dict
__["_mutations"] = {}
980 # A dictionary whose keys are the names of columns that must be
981 # verified as prerequisites when the transaction commits. The values
982 # in the dictionary are all None.
983 self
.__dict
__["_prereqs"] = {}
985 def __lt__(self
, other
):
986 if not isinstance(other
, Row
):
987 return NotImplemented
988 return bool(self
.__dict
__['uuid'] < other
.__dict
__['uuid'])
990 def __eq__(self
, other
):
991 if not isinstance(other
, Row
):
992 return NotImplemented
993 return bool(self
.__dict
__['uuid'] == other
.__dict
__['uuid'])
996 return int(self
.__dict
__['uuid'])
999 return "{table}({data})".format(
1000 table
=self
._table
.name
,
1001 data
=", ".join("{col}={val}".format(col
=c
, val
=getattr(self
, c
))
1002 for c
in sorted(self
._table
.columns
)))
1004 def __getattr__(self
, column_name
):
1005 assert self
._changes
is not None
1006 assert self
._mutations
is not None
1009 column
= self
._table
.columns
[column_name
]
1011 raise AttributeError("%s instance has no attribute '%s'" %
1012 (self
.__class
__.__name
__, column_name
))
1013 datum
= self
._changes
.get(column_name
)
1015 if '_inserts' in self
._mutations
.keys():
1016 inserts
= self
._mutations
['_inserts'].get(column_name
)
1018 if '_removes' in self
._mutations
.keys():
1019 removes
= self
._mutations
['_removes'].get(column_name
)
1021 if self
._data
is None:
1023 raise AttributeError("%s instance has no attribute '%s'" %
1024 (self
.__class
__.__name
__,
1027 datum
= data
.Datum
.from_python(column
.type,
1030 elif column_name
in self
._data
:
1031 datum
= self
._data
[column_name
]
1032 if column
.type.is_set():
1033 dlist
= datum
.as_list()
1034 if inserts
is not None:
1035 dlist
.extend(list(inserts
))
1036 if removes
is not None:
1037 removes_datum
= data
.Datum
.from_python(column
.type,
1040 removes_list
= removes_datum
.as_list()
1041 dlist
= [x
for x
in dlist
if x
not in removes_list
]
1042 datum
= data
.Datum
.from_python(column
.type, dlist
,
1044 elif column
.type.is_map():
1045 dmap
= datum
.to_python(_uuid_to_row
)
1046 if inserts
is not None:
1047 dmap
.update(inserts
)
1048 if removes
is not None:
1050 if key
not in (inserts
or {}):
1052 datum
= data
.Datum
.from_python(column
.type, dmap
,
1056 raise AttributeError("%s instance has no attribute '%s'" %
1057 (self
.__class
__.__name
__,
1062 return datum
.to_python(_uuid_to_row
)
1064 def __setattr__(self
, column_name
, value
):
1065 assert self
._changes
is not None
1066 assert self
._idl
.txn
1068 if ((self
._table
.name
in self
._idl
.readonly
) and
1069 (column_name
in self
._idl
.readonly
[self
._table
.name
])):
1070 vlog
.warn("attempting to write to readonly column %s"
1074 column
= self
._table
.columns
[column_name
]
1076 datum
= data
.Datum
.from_python(column
.type, value
, _row_to_uuid
)
1077 except error
.Error
as e
:
1079 vlog
.err("attempting to write bad value to column %s (%s)"
1082 # Remove prior version of the Row from the index if it has the indexed
1083 # column set, and the column changing is an indexed column
1084 if hasattr(self
, column_name
):
1085 for idx
in self
._table
.rows
.indexes
.values():
1086 if column_name
in (c
.column
for c
in idx
.columns
):
1088 self
._idl
.txn
._write
(self
, column
, datum
)
1089 for idx
in self
._table
.rows
.indexes
.values():
1090 # Only update the index if indexed columns change
1091 if column_name
in (c
.column
for c
in idx
.columns
):
1094 def addvalue(self
, column_name
, key
):
1095 self
._idl
.txn
._txn
_rows
[self
.uuid
] = self
1096 column
= self
._table
.columns
[column_name
]
1098 data
.Datum
.from_python(column
.type, key
, _row_to_uuid
)
1099 except error
.Error
as e
:
1101 vlog
.err("attempting to write bad value to column %s (%s)"
1104 inserts
= self
._mutations
.setdefault('_inserts', {})
1105 column_value
= inserts
.setdefault(column_name
, set())
1106 column_value
.add(key
)
1108 def delvalue(self
, column_name
, key
):
1109 self
._idl
.txn
._txn
_rows
[self
.uuid
] = self
1110 column
= self
._table
.columns
[column_name
]
1112 data
.Datum
.from_python(column
.type, key
, _row_to_uuid
)
1113 except error
.Error
as e
:
1115 vlog
.err("attempting to delete bad value from column %s (%s)"
1118 removes
= self
._mutations
.setdefault('_removes', {})
1119 column_value
= removes
.setdefault(column_name
, set())
1120 column_value
.add(key
)
1122 def setkey(self
, column_name
, key
, value
):
1123 self
._idl
.txn
._txn
_rows
[self
.uuid
] = self
1124 column
= self
._table
.columns
[column_name
]
1126 data
.Datum
.from_python(column
.type, {key
: value
}, _row_to_uuid
)
1127 except error
.Error
as e
:
1129 vlog
.err("attempting to write bad value to column %s (%s)"
1132 if self
._data
and column_name
in self
._data
:
1133 # Remove existing key/value before updating.
1134 removes
= self
._mutations
.setdefault('_removes', {})
1135 column_value
= removes
.setdefault(column_name
, set())
1136 column_value
.add(key
)
1137 inserts
= self
._mutations
.setdefault('_inserts', {})
1138 column_value
= inserts
.setdefault(column_name
, {})
1139 column_value
[key
] = value
1141 def delkey(self
, column_name
, key
, value
=None):
1142 self
._idl
.txn
._txn
_rows
[self
.uuid
] = self
1145 old_value
= data
.Datum
.to_python(self
._data
[column_name
],
1149 if key
not in old_value
:
1151 if old_value
[key
] != value
:
1153 removes
= self
._mutations
.setdefault('_removes', {})
1154 column_value
= removes
.setdefault(column_name
, set())
1155 column_value
.add(key
)
1159 def from_json(cls
, idl
, table
, uuid
, row_json
):
1161 for column_name
, datum_json
in row_json
.items():
1162 column
= table
.columns
.get(column_name
)
1165 vlog
.warn("unknown column %s in table %s"
1166 % (column_name
, table
.name
))
1169 datum
= ovs
.db
.data
.Datum
.from_json(column
.type, datum_json
)
1170 except error
.Error
as e
:
1172 vlog
.warn("error parsing column %s in table %s: %s"
1173 % (column_name
, table
.name
, e
))
1175 data
[column_name
] = datum
1176 return cls(idl
, table
, uuid
, data
)
1178 def verify(self
, column_name
):
1179 """Causes the original contents of column 'column_name' in this row to
1180 be verified as a prerequisite to completing the transaction. That is,
1181 if 'column_name' changed in this row (or if this row was deleted)
1182 between the time that the IDL originally read its contents and the time
1183 that the transaction commits, then the transaction aborts and
1184 Transaction.commit() returns Transaction.TRY_AGAIN.
1186 The intention is that, to ensure that no transaction commits based on
1187 dirty reads, an application should call Row.verify() on each data item
1188 read as part of a read-modify-write operation.
1190 In some cases Row.verify() reduces to a no-op, because the current
1191 value of the column is already known:
1193 - If this row is a row created by the current transaction (returned
1194 by Transaction.insert()).
1196 - If the column has already been modified within the current
1199 Because of the latter property, always call Row.verify() *before*
1200 modifying the column, for a given read-modify-write.
1202 A transaction must be in progress."""
1203 assert self
._idl
.txn
1204 assert self
._changes
is not None
1205 if not self
._data
or column_name
in self
._changes
:
1208 self
._prereqs
[column_name
] = None
1211 """Deletes this row from its table.
1213 A transaction must be in progress."""
1214 assert self
._idl
.txn
1215 assert self
._changes
is not None
1216 if self
._data
is None:
1217 del self
._idl
.txn
._txn
_rows
[self
.uuid
]
1219 self
._idl
.txn
._txn
_rows
[self
.uuid
] = self
1220 del self
._table
.rows
[self
.uuid
]
1221 self
.__dict
__["_changes"] = None
1223 def fetch(self
, column_name
):
1224 self
._idl
.txn
._fetch
(self
, column_name
)
1226 def increment(self
, column_name
):
1227 """Causes the transaction, when committed, to increment the value of
1228 'column_name' within this row by 1. 'column_name' must have an integer
1229 type. After the transaction commits successfully, the client may
1230 retrieve the final (incremented) value of 'column_name' with
1231 Transaction.get_increment_new_value().
1233 The client could accomplish something similar by reading and writing
1234 and verify()ing columns. However, increment() will never (by itself)
1235 cause a transaction to fail because of a verify error.
1237 The intended use is for incrementing the "next_cfg" column in
1238 the Open_vSwitch table."""
1239 self
._idl
.txn
._increment
(self
, column_name
)
1242 def _uuid_name_from_uuid(uuid
):
1243 return "row%s" % str(uuid
).replace("-", "_")
1246 def _where_uuid_equals(uuid
):
1247 return [["_uuid", "==", ["uuid", str(uuid
)]]]
1250 class _InsertedRow(object):
1251 def __init__(self
, op_index
):
1252 self
.op_index
= op_index
1256 class Transaction(object):
1257 """A transaction may modify the contents of a database by modifying the
1258 values of columns, deleting rows, inserting rows, or adding checks that
1259 columns in the database have not changed ("verify" operations), through
1262 Reading and writing columns and inserting and deleting rows are all
1263 straightforward. The reasons to verify columns are less obvious.
1264 Verification is the key to maintaining transactional integrity. Because
1265 OVSDB handles multiple clients, it can happen that between the time that
1266 OVSDB client A reads a column and writes a new value, OVSDB client B has
1267 written that column. Client A's write should not ordinarily overwrite
1268 client B's, especially if the column in question is a "map" column that
1269 contains several more or less independent data items. If client A adds a
1270 "verify" operation before it writes the column, then the transaction fails
1271 in case client B modifies it first. Client A will then see the new value
1272 of the column and compose a new transaction based on the new contents
1273 written by client B.
1275 When a transaction is complete, which must be before the next call to
1276 Idl.run(), call Transaction.commit() or Transaction.abort().
1278 The life-cycle of a transaction looks like this:
1280 1. Create the transaction and record the initial sequence number:
1282 seqno = idl.change_seqno(idl)
1283 txn = Transaction(idl)
1285 2. Modify the database with Row and Transaction methods.
1287 3. Commit the transaction by calling Transaction.commit(). The first call
1288 to this function probably returns Transaction.INCOMPLETE. The client
1289 must keep calling again along as this remains true, calling Idl.run() in
1290 between to let the IDL do protocol processing. (If the client doesn't
1291 have anything else to do in the meantime, it can use
1292 Transaction.commit_block() to avoid having to loop itself.)
1294 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
1295 to change from the saved 'seqno' (it's possible that it's already
1296 changed, in which case the client should not wait at all), then start
1297 over from step 1. Only a call to Idl.run() will change the return value
1298 of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)"""
1300 # Status values that Transaction.commit() can return.
1302 # Not yet committed or aborted.
1303 UNCOMMITTED
= "uncommitted"
1304 # Transaction didn't include any changes.
1305 UNCHANGED
= "unchanged"
1306 # Commit in progress, please wait.
1307 INCOMPLETE
= "incomplete"
1308 # ovsdb_idl_txn_abort() called.
1310 # Commit successful.
1312 # Commit failed because a "verify" operation
1313 # reported an inconsistency, due to a network
1314 # problem, or other transient failure. Wait
1315 # for a change, then try again.
1316 TRY_AGAIN
= "try again"
1317 # Server hasn't given us the lock yet.
1318 NOT_LOCKED
= "not locked"
1319 # Commit failed due to a hard error.
1323 def status_to_string(status
):
1324 """Converts one of the status values that Transaction.commit() can
1325 return into a human-readable string.
1327 (The status values are in fact such strings already, so
1328 there's nothing to do.)"""
1331 def __init__(self
, idl
):
1332 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
1333 A given Idl may only have a single active transaction at a time.
1335 A Transaction may modify the contents of a database by assigning new
1336 values to columns (attributes of Row), deleting rows (with
1337 Row.delete()), or inserting rows (with Transaction.insert()). It may
1338 also check that columns in the database have not changed with
1341 When a transaction is complete (which must be before the next call to
1342 Idl.run()), call Transaction.commit() or Transaction.abort()."""
1343 assert idl
.txn
is None
1346 self
._request
_id
= None
1348 self
.dry_run
= False
1350 self
._status
= Transaction
.UNCOMMITTED
1354 self
._inc
_row
= None
1355 self
._inc
_column
= None
1357 self
._fetch
_requests
= []
1359 self
._inserted
_rows
= {} # Map from UUID to _InsertedRow
1361 def add_comment(self
, comment
):
1362 """Appends 'comment' to the comments that will be passed to the OVSDB
1363 server when this transaction is committed. (The comment will be
1364 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
1365 relatively human-readable form.)"""
1366 self
._comments
.append(comment
)
1368 def wait(self
, poller
):
1369 """Causes poll_block() to wake up if this transaction has completed
1371 if self
._status
not in (Transaction
.UNCOMMITTED
,
1372 Transaction
.INCOMPLETE
):
1373 poller
.immediate_wake()
1375 def _substitute_uuids(self
, json
):
1376 if isinstance(json
, (list, tuple)):
1378 and json
[0] == 'uuid'
1379 and ovs
.ovsuuid
.is_valid_string(json
[1])):
1380 uuid
= ovs
.ovsuuid
.from_string(json
[1])
1381 row
= self
._txn
_rows
.get(uuid
, None)
1382 if row
and row
._data
is None:
1383 return ["named-uuid", _uuid_name_from_uuid(uuid
)]
1385 return [self
._substitute
_uuids
(elem
) for elem
in json
]
1388 def __disassemble(self
):
1391 for row
in self
._txn
_rows
.values():
1392 if row
._changes
is None:
1393 # If we add the deleted row back to rows with _changes == None
1394 # then __getattr__ will not work for the indexes
1395 row
.__dict
__["_changes"] = {}
1396 row
.__dict
__["_mutations"] = {}
1397 row
._table
.rows
[row
.uuid
] = row
1398 elif row
._data
is None:
1399 del row
._table
.rows
[row
.uuid
]
1400 row
.__dict
__["_changes"] = {}
1401 row
.__dict
__["_mutations"] = {}
1402 row
.__dict
__["_prereqs"] = {}
1406 """Attempts to commit 'txn'. Returns the status of the commit
1407 operation, one of the following constants:
1409 Transaction.INCOMPLETE:
1411 The transaction is in progress, but not yet complete. The caller
1412 should call again later, after calling Idl.run() to let the
1413 IDL do OVSDB protocol processing.
1415 Transaction.UNCHANGED:
1417 The transaction is complete. (It didn't actually change the
1418 database, so the IDL didn't send any request to the database
1421 Transaction.ABORTED:
1423 The caller previously called Transaction.abort().
1425 Transaction.SUCCESS:
1427 The transaction was successful. The update made by the
1428 transaction (and possibly other changes made by other database
1429 clients) should already be visible in the IDL.
1431 Transaction.TRY_AGAIN:
1433 The transaction failed for some transient reason, e.g. because a
1434 "verify" operation reported an inconsistency or due to a network
1435 problem. The caller should wait for a change to the database,
1436 then compose a new transaction, and commit the new transaction.
1438 Use Idl.change_seqno to wait for a change in the database. It is
1439 important to use its value *before* the initial call to
1440 Transaction.commit() as the baseline for this purpose, because
1441 the change that one should wait for can happen after the initial
1442 call but before the call that returns Transaction.TRY_AGAIN, and
1443 using some other baseline value in that situation could cause an
1444 indefinite wait if the database rarely changes.
1446 Transaction.NOT_LOCKED:
1448 The transaction failed because the IDL has been configured to
1449 require a database lock (with Idl.set_lock()) but didn't
1450 get it yet or has already lost it.
1452 Committing a transaction rolls back all of the changes that it made to
1453 the IDL's copy of the database. If the transaction commits
1454 successfully, then the database server will send an update and, thus,
1455 the IDL will be updated with the committed changes."""
1456 # The status can only change if we're the active transaction.
1457 # (Otherwise, our status will change only in Idl.run().)
1458 if self
!= self
.idl
.txn
:
1461 # If we need a lock but don't have it, give up quickly.
1462 if self
.idl
.lock_name
and not self
.idl
.has_lock
:
1463 self
._status
= Transaction
.NOT_LOCKED
1464 self
.__disassemble
()
1467 operations
= [self
.idl
._db
.name
]
1469 # Assert that we have the required lock (avoiding a race).
1470 if self
.idl
.lock_name
:
1471 operations
.append({"op": "assert",
1472 "lock": self
.idl
.lock_name
})
1474 # Add prerequisites and declarations of new rows.
1475 for row
in self
._txn
_rows
.values():
1479 for column_name
in row
._prereqs
:
1480 columns
.append(column_name
)
1481 rows
[column_name
] = row
._data
[column_name
].to_json()
1482 operations
.append({"op": "wait",
1483 "table": row
._table
.name
,
1485 "where": _where_uuid_equals(row
.uuid
),
1492 for row
in self
._txn
_rows
.values():
1493 if row
._changes
is None:
1494 if row
._table
.is_root
:
1495 operations
.append({"op": "delete",
1496 "table": row
._table
.name
,
1497 "where": _where_uuid_equals(row
.uuid
)})
1500 # Let ovsdb-server decide whether to really delete it.
1503 op
= {"table": row
._table
.name
}
1504 if row
._data
is None:
1506 op
["uuid-name"] = _uuid_name_from_uuid(row
.uuid
)
1509 op_index
= len(operations
) - 1
1510 self
._inserted
_rows
[row
.uuid
] = _InsertedRow(op_index
)
1513 op
["where"] = _where_uuid_equals(row
.uuid
)
1516 op
["row"] = row_json
1518 for column_name
, datum
in row
._changes
.items():
1519 if row
._data
is not None or not datum
.is_default():
1520 row_json
[column_name
] = (
1521 self
._substitute
_uuids
(datum
.to_json()))
1523 # If anything really changed, consider it an update.
1524 # We can't suppress not-really-changed values earlier
1525 # or transactions would become nonatomic (see the big
1526 # comment inside Transaction._write()).
1527 if (not any_updates
and row
._data
is not None and
1528 row
._data
[column_name
] != datum
):
1531 if row
._data
is None or row_json
:
1532 operations
.append(op
)
1535 op
= {"table": row
._table
.name
}
1537 if row
._data
is None:
1539 op
["where"] = self
._substitute
_uuids
(
1540 _where_uuid_equals(row
.uuid
))
1543 op
["where"] = _where_uuid_equals(row
.uuid
)
1544 op
["mutations"] = []
1545 if '_removes' in row
._mutations
.keys():
1546 for col
, dat
in row
._mutations
['_removes'].items():
1547 column
= row
._table
.columns
[col
]
1548 if column
.type.is_map():
1550 opdat
.append(list(dat
))
1556 datum
= data
.Datum
.from_python(column
.type,
1561 self
._substitute
_uuids
(datum
.to_json()))
1562 opdat
.append(inner_opdat
)
1563 mutation
= [col
, "delete", opdat
]
1564 op
["mutations"].append(mutation
)
1566 if '_inserts' in row
._mutations
.keys():
1567 for col
, val
in row
._mutations
['_inserts'].items():
1568 column
= row
._table
.columns
[col
]
1569 if column
.type.is_map():
1571 datum
= data
.Datum
.from_python(column
.type, val
,
1573 opdat
.append(datum
.as_list())
1579 datum
= data
.Datum
.from_python(column
.type,
1584 self
._substitute
_uuids
(datum
.to_json()))
1585 opdat
.append(inner_opdat
)
1586 mutation
= [col
, "insert", opdat
]
1587 op
["mutations"].append(mutation
)
1590 operations
.append(op
)
1593 if self
._fetch
_requests
:
1594 for fetch
in self
._fetch
_requests
:
1595 fetch
["index"] = len(operations
) - 1
1596 operations
.append({"op": "select",
1597 "table": fetch
["row"]._table
.name
,
1598 "where": self
._substitute
_uuids
(
1599 _where_uuid_equals(fetch
["row"].uuid
)),
1600 "columns": [fetch
["column_name"]]})
1604 if self
._inc
_row
and any_updates
:
1605 self
._inc
_index
= len(operations
) - 1
1607 operations
.append({"op": "mutate",
1608 "table": self
._inc
_row
._table
.name
,
1609 "where": self
._substitute
_uuids
(
1610 _where_uuid_equals(self
._inc
_row
.uuid
)),
1611 "mutations": [[self
._inc
_column
, "+=", 1]]})
1612 operations
.append({"op": "select",
1613 "table": self
._inc
_row
._table
.name
,
1614 "where": self
._substitute
_uuids
(
1615 _where_uuid_equals(self
._inc
_row
.uuid
)),
1616 "columns": [self
._inc
_column
]})
1620 operations
.append({"op": "comment",
1621 "comment": "\n".join(self
._comments
)})
1625 operations
.append({"op": "abort"})
1628 self
._status
= Transaction
.UNCHANGED
1630 msg
= ovs
.jsonrpc
.Message
.create_request("transact", operations
)
1631 self
._request
_id
= msg
.id
1632 if not self
.idl
._session
.send(msg
):
1633 self
.idl
._outstanding
_txns
[self
._request
_id
] = self
1634 self
._status
= Transaction
.INCOMPLETE
1636 self
._status
= Transaction
.TRY_AGAIN
1638 self
.__disassemble
()
1641 def commit_block(self
):
1642 """Attempts to commit this transaction, blocking until the commit
1643 either succeeds or fails. Returns the final commit status, which may
1644 be any Transaction.* value other than Transaction.INCOMPLETE.
1646 This function calls Idl.run() on this transaction'ss IDL, so it may
1647 cause Idl.change_seqno to change."""
1649 status
= self
.commit()
1650 if status
!= Transaction
.INCOMPLETE
:
1655 poller
= ovs
.poller
.Poller()
1656 self
.idl
.wait(poller
)
1660 def get_increment_new_value(self
):
1661 """Returns the final (incremented) value of the column in this
1662 transaction that was set to be incremented by Row.increment. This
1663 transaction must have committed successfully."""
1664 assert self
._status
== Transaction
.SUCCESS
1665 return self
._inc
_new
_value
1668 """Aborts this transaction. If Transaction.commit() has already been
1669 called then the transaction might get committed anyhow."""
1670 self
.__disassemble
()
1671 if self
._status
in (Transaction
.UNCOMMITTED
,
1672 Transaction
.INCOMPLETE
):
1673 self
._status
= Transaction
.ABORTED
1675 def get_error(self
):
1676 """Returns a string representing this transaction's current status,
1677 suitable for use in log messages."""
1678 if self
._status
!= Transaction
.ERROR
:
1679 return Transaction
.status_to_string(self
._status
)
1683 return "no error details available"
1685 def __set_error_json(self
, json
):
1686 if self
._error
is None:
1687 self
._error
= ovs
.json
.to_string(json
)
1689 def get_insert_uuid(self
, uuid
):
1690 """Finds and returns the permanent UUID that the database assigned to a
1691 newly inserted row, given the UUID that Transaction.insert() assigned
1692 locally to that row.
1694 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1695 or if it was assigned by that function and then deleted by Row.delete()
1696 within the same transaction. (Rows that are inserted and then deleted
1697 within a single transaction are never sent to the database server, so
1698 it never assigns them a permanent UUID.)
1700 This transaction must have completed successfully."""
1701 assert self
._status
in (Transaction
.SUCCESS
,
1702 Transaction
.UNCHANGED
)
1703 inserted_row
= self
._inserted
_rows
.get(uuid
)
1705 return inserted_row
.real
1708 def _increment(self
, row
, column
):
1709 assert not self
._inc
_row
1711 self
._inc
_column
= column
1713 def _fetch(self
, row
, column_name
):
1714 self
._fetch
_requests
.append({"row": row
, "column_name": column_name
})
1716 def _write(self
, row
, column
, datum
):
1717 assert row
._changes
is not None
1718 assert row
._mutations
is not None
1722 # If this is a write-only column and the datum being written is the
1723 # same as the one already there, just skip the update entirely. This
1724 # is worth optimizing because we have a lot of columns that get
1725 # periodically refreshed into the database but don't actually change
1728 # We don't do this for read/write columns because that would break
1729 # atomicity of transactions--some other client might have written a
1730 # different value in that column since we read it. (But if a whole
1731 # transaction only does writes of existing values, without making any
1732 # real changes, we will drop the whole transaction later in
1733 # ovsdb_idl_txn_commit().)
1734 if (not column
.alert
and row
._data
and
1735 row
._data
.get(column
.name
) == datum
):
1736 new_value
= row
._changes
.get(column
.name
)
1737 if new_value
is None or new_value
== datum
:
1740 txn
._txn
_rows
[row
.uuid
] = row
1741 if '_inserts' in row
._mutations
:
1742 row
._mutations
['_inserts'].pop(column
.name
, None)
1743 if '_removes' in row
._mutations
:
1744 row
._mutations
['_removes'].pop(column
.name
, None)
1745 row
._changes
[column
.name
] = datum
.copy()
1747 def insert(self
, table
, new_uuid
=None):
1748 """Inserts and returns a new row in 'table', which must be one of the
1749 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1751 The new row is assigned a provisional UUID. If 'uuid' is None then one
1752 is randomly generated; otherwise 'uuid' should specify a randomly
1753 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
1754 different UUID when 'txn' is committed, but the IDL will replace any
1755 uses of the provisional UUID in the data to be to be committed by the
1756 UUID assigned by ovsdb-server."""
1757 assert self
._status
== Transaction
.UNCOMMITTED
1758 if new_uuid
is None:
1759 new_uuid
= uuid
.uuid4()
1760 row
= Row(self
.idl
, table
, new_uuid
, None)
1761 table
.rows
[row
.uuid
] = row
1762 self
._txn
_rows
[row
.uuid
] = row
1765 def _process_reply(self
, msg
):
1766 if msg
.type == ovs
.jsonrpc
.Message
.T_ERROR
:
1767 self
._status
= Transaction
.ERROR
1768 elif not isinstance(msg
.result
, (list, tuple)):
1770 vlog
.warn('reply to "transact" is not JSON array')
1779 # This isn't an error in itself but indicates that some
1780 # prior operation failed, so make sure that we know about
1783 elif isinstance(op
, dict):
1784 error
= op
.get("error")
1785 if error
is not None:
1786 if error
== "timed out":
1788 elif error
== "not owner":
1790 elif error
== "aborted":
1794 self
.__set
_error
_json
(op
)
1797 self
.__set
_error
_json
(op
)
1799 vlog
.warn("operation reply is not JSON null or object")
1801 if not soft_errors
and not hard_errors
and not lock_errors
:
1802 if self
._inc
_row
and not self
.__process
_inc
_reply
(ops
):
1804 if self
._fetch
_requests
:
1805 if self
.__process
_fetch
_reply
(ops
):
1806 self
.idl
.change_seqno
+= 1
1810 for insert
in self
._inserted
_rows
.values():
1811 if not self
.__process
_insert
_reply
(insert
, ops
):
1815 self
._status
= Transaction
.ERROR
1817 self
._status
= Transaction
.NOT_LOCKED
1819 self
._status
= Transaction
.TRY_AGAIN
1821 self
._status
= Transaction
.SUCCESS
1824 def __check_json_type(json
, types
, name
):
1827 vlog
.warn("%s is missing" % name
)
1829 elif not isinstance(json
, tuple(types
)):
1831 vlog
.warn("%s has unexpected type %s" % (name
, type(json
)))
1836 def __process_fetch_reply(self
, ops
):
1838 for fetch_request
in self
._fetch
_requests
:
1839 row
= fetch_request
["row"]
1840 column_name
= fetch_request
["column_name"]
1841 index
= fetch_request
["index"]
1845 fetched_rows
= select
.get("rows")
1846 if not Transaction
.__check
_json
_type
(fetched_rows
, (list, tuple),
1847 '"select" reply "rows"'):
1849 if len(fetched_rows
) != 1:
1851 vlog
.warn('"select" reply "rows" has %d elements '
1852 'instead of 1' % len(fetched_rows
))
1854 fetched_row
= fetched_rows
[0]
1855 if not Transaction
.__check
_json
_type
(fetched_row
, (dict,),
1856 '"select" reply row'):
1859 column
= table
.columns
.get(column_name
)
1860 datum_json
= fetched_row
.get(column_name
)
1861 datum
= data
.Datum
.from_json(column
.type, datum_json
)
1863 row
._data
[column_name
] = datum
1868 def __process_inc_reply(self
, ops
):
1869 if self
._inc
_index
+ 2 > len(ops
):
1871 vlog
.warn("reply does not contain enough operations for "
1872 "increment (has %d, needs %d)" %
1873 (len(ops
), self
._inc
_index
+ 2))
1875 # We know that this is a JSON object because the loop in
1876 # __process_reply() already checked.
1877 mutate
= ops
[self
._inc
_index
]
1878 count
= mutate
.get("count")
1879 if not Transaction
.__check
_json
_type
(count
, (int,),
1880 '"mutate" reply "count"'):
1884 vlog
.warn('"mutate" reply "count" is %d instead of 1' % count
)
1887 select
= ops
[self
._inc
_index
+ 1]
1888 rows
= select
.get("rows")
1889 if not Transaction
.__check
_json
_type
(rows
, (list, tuple),
1890 '"select" reply "rows"'):
1894 vlog
.warn('"select" reply "rows" has %d elements '
1895 'instead of 1' % len(rows
))
1898 if not Transaction
.__check
_json
_type
(row
, (dict,),
1899 '"select" reply row'):
1901 column
= row
.get(self
._inc
_column
)
1902 if not Transaction
.__check
_json
_type
(column
, (int,),
1903 '"select" reply inc column'):
1905 self
._inc
_new
_value
= column
1908 def __process_insert_reply(self
, insert
, ops
):
1909 if insert
.op_index
>= len(ops
):
1911 vlog
.warn("reply does not contain enough operations "
1912 "for insert (has %d, needs %d)"
1913 % (len(ops
), insert
.op_index
))
1916 # We know that this is a JSON object because the loop in
1917 # __process_reply() already checked.
1918 reply
= ops
[insert
.op_index
]
1919 json_uuid
= reply
.get("uuid")
1920 if not Transaction
.__check
_json
_type
(json_uuid
, (tuple, list),
1921 '"insert" reply "uuid"'):
1925 uuid_
= ovs
.ovsuuid
.from_json(json_uuid
)
1928 vlog
.warn('"insert" reply "uuid" is not a JSON UUID')
1935 class SchemaHelper(object):
1936 """IDL Schema helper.
1938 This class encapsulates the logic required to generate schemas suitable
1939 for creating 'ovs.db.idl.Idl' objects. Clients should register columns
1940 they are interested in using register_columns(). When finished, the
1941 get_idl_schema() function may be called.
1943 The location on disk of the schema used may be found in the
1944 'schema_location' variable."""
1946 def __init__(self
, location
=None, schema_json
=None):
1947 """Creates a new Schema object.
1949 'location' file path to ovs schema. None means default location
1950 'schema_json' schema in json preresentation in memory
1953 if location
and schema_json
:
1954 raise ValueError("both location and schema_json can't be "
1955 "specified. it's ambiguous.")
1956 if schema_json
is None:
1957 if location
is None:
1958 location
= "%s/vswitch.ovsschema" % ovs
.dirs
.PKGDATADIR
1959 schema_json
= ovs
.json
.from_file(location
)
1961 self
.schema_json
= schema_json
1966 def register_columns(self
, table
, columns
, readonly
=[]):
1967 """Registers interest in the given 'columns' of 'table'. Future calls
1968 to get_idl_schema() will include 'table':column for each column in
1969 'columns'. This function automatically avoids adding duplicate entries
1971 A subset of 'columns' can be specified as 'readonly'. The readonly
1972 columns are not replicated but can be fetched on-demand by the user
1975 'table' must be a string.
1976 'columns' must be a list of strings.
1977 'readonly' must be a list of strings.
1980 assert isinstance(table
, str)
1981 assert isinstance(columns
, list)
1983 columns
= set(columns
) | self
._tables
.get(table
, set())
1984 self
._tables
[table
] = columns
1985 self
._readonly
[table
] = readonly
1987 def register_table(self
, table
):
1988 """Registers interest in the given all columns of 'table'. Future calls
1989 to get_idl_schema() will include all columns of 'table'.
1991 'table' must be a string
1993 assert isinstance(table
, str)
1994 self
._tables
[table
] = set() # empty set means all columns in the table
1996 def register_all(self
):
1997 """Registers interest in every column of every table."""
2000 def get_idl_schema(self
):
2001 """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
2002 object based on columns registered using the register_columns()
2005 schema
= ovs
.db
.schema
.DbSchema
.from_json(self
.schema_json
)
2006 self
.schema_json
= None
2010 for table
, columns
in self
._tables
.items():
2011 schema_tables
[table
] = (
2012 self
._keep
_table
_columns
(schema
, table
, columns
))
2014 schema
.tables
= schema_tables
2015 schema
.readonly
= self
._readonly
2018 def _keep_table_columns(self
, schema
, table_name
, columns
):
2019 assert table_name
in schema
.tables
2020 table
= schema
.tables
[table_name
]
2023 # empty set means all columns in the table
2027 for column_name
in columns
:
2028 assert isinstance(column_name
, str)
2029 assert column_name
in table
.columns
2031 new_columns
[column_name
] = table
.columns
[column_name
]
2033 table
.columns
= new_columns