]>
Commit | Line | Data |
---|---|---|
0164e367 | 1 | # Copyright (c) 2009, 2010, 2011, 2012, 2013, 2016 Nicira, Inc. |
99155935 BP |
2 | # |
3 | # Licensed under the Apache License, Version 2.0 (the "License"); | |
4 | # you may not use this file except in compliance with the License. | |
5 | # You may obtain a copy of the License at: | |
6 | # | |
7 | # http://www.apache.org/licenses/LICENSE-2.0 | |
8 | # | |
9 | # Unless required by applicable law or agreed to in writing, software | |
10 | # distributed under the License is distributed on an "AS IS" BASIS, | |
11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 | # See the License for the specific language governing permissions and | |
13 | # limitations under the License. | |
14 | ||
fbafc3c2 | 15 | import functools |
8cdf0349 | 16 | import uuid |
99155935 | 17 | |
a59912a0 | 18 | import ovs.db.data as data |
4c0f6271 | 19 | import ovs.db.parser |
99155935 | 20 | import ovs.db.schema |
6c7050b5 | 21 | import ovs.jsonrpc |
99155935 | 22 | import ovs.ovsuuid |
8cdf0349 | 23 | import ovs.poller |
3a656eaf | 24 | import ovs.vlog |
13973bc4 | 25 | from ovs.db import custom_index |
6c7050b5 | 26 | from ovs.db import error |
27 | ||
3a656eaf | 28 | vlog = ovs.vlog.Vlog("idl") |
99155935 | 29 | |
26bb0f31 EJ |
30 | __pychecker__ = 'no-classattr no-objattrs' |
31 | ||
d7d417fc TW |
32 | ROW_CREATE = "create" |
33 | ROW_UPDATE = "update" | |
34 | ROW_DELETE = "delete" | |
26bb0f31 | 35 | |
897c8064 LS |
36 | OVSDB_UPDATE = 0 |
37 | OVSDB_UPDATE2 = 1 | |
38 | ||
c39751e4 TE |
39 | CLUSTERED = "clustered" |
40 | ||
d7d417fc TW |
41 | |
42 | class Idl(object): | |
99155935 BP |
43 | """Open vSwitch Database Interface Definition Language (OVSDB IDL). |
44 | ||
45 | The OVSDB IDL maintains an in-memory replica of a database. It issues RPC | |
46 | requests to an OVSDB database server and parses the responses, converting | |
47 | raw JSON into data structures that are easier for clients to digest. | |
48 | ||
49 | The IDL also assists with issuing database transactions. The client | |
50 | creates a transaction, manipulates the IDL data structures, and commits or | |
51 | aborts the transaction. The IDL then composes and issues the necessary | |
52 | JSON-RPC requests and reports to the client whether the transaction | |
53 | completed successfully. | |
54 | ||
8cdf0349 BP |
55 | The client is allowed to access the following attributes directly, in a |
56 | read-only fashion: | |
99155935 | 57 | |
8cdf0349 BP |
58 | - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided |
59 | to the Idl constructor. Each ovs.db.schema.TableSchema in the map is | |
60 | annotated with a new attribute 'rows', which is a dict from a uuid.UUID | |
61 | to a Row object. | |
62 | ||
63 | The client may directly read and write the Row objects referenced by the | |
64 | 'rows' map values. Refer to Row for more details. | |
65 | ||
66 | - 'change_seqno': A number that represents the IDL's state. When the IDL | |
2f926787 BP |
67 | is updated (by Idl.run()), its value changes. The sequence number can |
68 | occasionally change even if the database does not. This happens if the | |
69 | connection to the database drops and reconnects, which causes the | |
70 | database contents to be reloaded even if they didn't change. (It could | |
71 | also happen if the database server sends out a "change" that reflects | |
72 | what the IDL already thought was in the database. The database server is | |
73 | not supposed to do that, but bugs could in theory cause it to do so.) | |
8cdf0349 BP |
74 | |
75 | - 'lock_name': The name of the lock configured with Idl.set_lock(), or None | |
76 | if no lock is configured. | |
77 | ||
78 | - 'has_lock': True, if the IDL is configured to obtain a lock and owns that | |
79 | lock, and False otherwise. | |
80 | ||
81 | Locking and unlocking happens asynchronously from the database client's | |
82 | point of view, so the information is only useful for optimization | |
83 | (e.g. if the client doesn't have the lock then there's no point in trying | |
84 | to write to the database). | |
85 | ||
86 | - 'is_lock_contended': True, if the IDL is configured to obtain a lock but | |
87 | the database server has indicated that some other client already owns the | |
88 | requested lock, and False otherwise. | |
89 | ||
90 | - 'txn': The ovs.db.idl.Transaction object for the database transaction | |
91 | currently being constructed, if there is one, or None otherwise. | |
92 | """ | |
93 | ||
897c8064 | 94 | IDL_S_INITIAL = 0 |
c39751e4 TE |
95 | IDL_S_SERVER_SCHEMA_REQUESTED = 1 |
96 | IDL_S_SERVER_MONITOR_REQUESTED = 2 | |
97 | IDL_S_DATA_MONITOR_REQUESTED = 3 | |
98 | IDL_S_DATA_MONITOR_COND_REQUESTED = 4 | |
897c8064 | 99 | |
c39751e4 TE |
100 | def __init__(self, remote, schema_helper, probe_interval=None, |
101 | leader_only=True): | |
99155935 BP |
102 | """Creates and returns a connection to the database named 'db_name' on |
103 | 'remote', which should be in a form acceptable to | |
104 | ovs.jsonrpc.session.open(). The connection will maintain an in-memory | |
8cdf0349 BP |
105 | replica of the remote database. |
106 | ||
31e434fc NS |
107 | 'remote' can be comma separated multiple remotes and each remote |
108 | should be in a form acceptable to ovs.jsonrpc.session.open(). | |
109 | ||
7a68987a TA |
110 | 'schema_helper' should be an instance of the SchemaHelper class which |
111 | generates schema for the remote database. The caller may have cut it | |
112 | down by removing tables or columns that are not of interest. The IDL | |
113 | will only replicate the tables and columns that remain. The caller may | |
114 | also add an attribute named 'alert' to selected remaining columns, | |
115 | setting its value to False; if so, then changes to those columns will | |
116 | not be considered changes to the database for the purpose of the return | |
117 | value of Idl.run() and Idl.change_seqno. This is useful for columns | |
118 | that the IDL's client will write but not read. | |
8cdf0349 | 119 | |
ad0991e6 EJ |
120 | As a convenience to users, 'schema' may also be an instance of the |
121 | SchemaHelper class. | |
122 | ||
f73d562f LAG |
123 | The IDL uses and modifies 'schema' directly. |
124 | ||
c39751e4 TE |
125 | If 'leader_only' is set to True (default value) the IDL will only |
126 | monitor and transact with the leader of the cluster. | |
127 | ||
f73d562f LAG |
128 | If "probe_interval" is zero it disables the connection keepalive |
129 | feature. If non-zero the value will be forced to at least 1000 | |
130 | milliseconds. If None it will just use the default value in OVS. | |
131 | """ | |
8cdf0349 | 132 | |
7a68987a TA |
133 | assert isinstance(schema_helper, SchemaHelper) |
134 | schema = schema_helper.get_idl_schema() | |
ad0991e6 | 135 | |
8cdf0349 | 136 | self.tables = schema.tables |
80c12152 | 137 | self.readonly = schema.readonly |
8cdf0349 | 138 | self._db = schema |
31e434fc NS |
139 | remotes = self._parse_remotes(remote) |
140 | self._session = ovs.jsonrpc.Session.open_multiple(remotes, | |
f73d562f | 141 | probe_interval=probe_interval) |
8cdf0349 BP |
142 | self._monitor_request_id = None |
143 | self._last_seqno = None | |
99155935 | 144 | self.change_seqno = 0 |
897c8064 | 145 | self.uuid = uuid.uuid1() |
c39751e4 TE |
146 | |
147 | # Server monitor. | |
148 | self._server_schema_request_id = None | |
149 | self._server_monitor_request_id = None | |
150 | self._db_change_aware_request_id = None | |
151 | self._server_db_name = '_Server' | |
152 | self._server_db_table = 'Database' | |
153 | self.server_tables = None | |
154 | self._server_db = None | |
155 | self.server_monitor_uuid = uuid.uuid1() | |
156 | self.leader_only = leader_only | |
157 | self.cluster_id = None | |
158 | self._min_index = 0 | |
159 | ||
897c8064 | 160 | self.state = self.IDL_S_INITIAL |
8cdf0349 BP |
161 | |
162 | # Database locking. | |
163 | self.lock_name = None # Name of lock we need, None if none. | |
164 | self.has_lock = False # Has db server said we have the lock? | |
26bb0f31 | 165 | self.is_lock_contended = False # Has db server said we can't get lock? |
8cdf0349 BP |
166 | self._lock_request_id = None # JSON-RPC ID of in-flight lock request. |
167 | ||
168 | # Transaction support. | |
169 | self.txn = None | |
170 | self._outstanding_txns = {} | |
171 | ||
0c4d144a TR |
172 | for table in schema.tables.values(): |
173 | for column in table.columns.values(): | |
8cdf0349 BP |
174 | if not hasattr(column, 'alert'): |
175 | column.alert = True | |
176 | table.need_table = False | |
13973bc4 | 177 | table.rows = custom_index.IndexedRows(table) |
8cdf0349 | 178 | table.idl = self |
0164e367 | 179 | table.condition = [True] |
16ebb90e | 180 | table.cond_changed = False |
99155935 | 181 | |
31e434fc NS |
182 | def _parse_remotes(self, remote): |
183 | # If remote is - | |
184 | # "tcp:10.0.0.1:6641,unix:/tmp/db.sock,t,s,tcp:10.0.0.2:6642" | |
185 | # this function returns | |
186 | # ["tcp:10.0.0.1:6641", "unix:/tmp/db.sock,t,s", tcp:10.0.0.2:6642"] | |
187 | remotes = [] | |
188 | for r in remote.split(','): | |
189 | if remotes and r.find(":") == -1: | |
190 | remotes[-1] += "," + r | |
191 | else: | |
192 | remotes.append(r) | |
193 | return remotes | |
194 | ||
c39751e4 TE |
195 | def set_cluster_id(self, cluster_id): |
196 | """Set the id of the cluster that this idl must connect to.""" | |
197 | self.cluster_id = cluster_id | |
198 | if self.state != self.IDL_S_INITIAL: | |
199 | self.force_reconnect() | |
200 | ||
13973bc4 TW |
201 | def index_create(self, table, name): |
202 | """Create a named multi-column index on a table""" | |
203 | return self.tables[table].rows.index_create(name) | |
204 | ||
205 | def index_irange(self, table, name, start, end): | |
206 | """Return items in a named index between start/end inclusive""" | |
207 | return self.tables[table].rows.indexes[name].irange(start, end) | |
208 | ||
209 | def index_equal(self, table, name, value): | |
210 | """Return items in a named index matching a value""" | |
211 | return self.tables[table].rows.indexes[name].irange(value, value) | |
212 | ||
99155935 | 213 | def close(self): |
8cdf0349 BP |
214 | """Closes the connection to the database. The IDL will no longer |
215 | update.""" | |
216 | self._session.close() | |
99155935 BP |
217 | |
218 | def run(self): | |
219 | """Processes a batch of messages from the database server. Returns | |
220 | True if the database as seen through the IDL changed, False if it did | |
221 | not change. The initial fetch of the entire contents of the remote | |
8cdf0349 BP |
222 | database is considered to be one kind of change. If the IDL has been |
223 | configured to acquire a database lock (with Idl.set_lock()), then | |
224 | successfully acquiring the lock is also considered to be a change. | |
99155935 BP |
225 | |
226 | This function can return occasional false positives, that is, report | |
227 | that the database changed even though it didn't. This happens if the | |
228 | connection to the database drops and reconnects, which causes the | |
229 | database contents to be reloaded even if they didn't change. (It could | |
230 | also happen if the database server sends out a "change" that reflects | |
231 | what we already thought was in the database, but the database server is | |
232 | not supposed to do that.) | |
233 | ||
234 | As an alternative to checking the return value, the client may check | |
8cdf0349 BP |
235 | for changes in self.change_seqno.""" |
236 | assert not self.txn | |
99155935 | 237 | initial_change_seqno = self.change_seqno |
16ebb90e LS |
238 | |
239 | self.send_cond_change() | |
8cdf0349 BP |
240 | self._session.run() |
241 | i = 0 | |
242 | while i < 50: | |
243 | i += 1 | |
244 | if not self._session.is_connected(): | |
245 | break | |
246 | ||
247 | seqno = self._session.get_seqno() | |
248 | if seqno != self._last_seqno: | |
249 | self._last_seqno = seqno | |
250 | self.__txn_abort_all() | |
c39751e4 | 251 | self.__send_server_schema_request() |
8cdf0349 BP |
252 | if self.lock_name: |
253 | self.__send_lock_request() | |
254 | break | |
255 | ||
256 | msg = self._session.recv() | |
257 | if msg is None: | |
258 | break | |
c39751e4 | 259 | |
8cdf0349 | 260 | if (msg.type == ovs.jsonrpc.Message.T_NOTIFY |
897c8064 LS |
261 | and msg.method == "update2" |
262 | and len(msg.params) == 2): | |
263 | # Database contents changed. | |
264 | self.__parse_update(msg.params[1], OVSDB_UPDATE2) | |
265 | elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY | |
266 | and msg.method == "update" | |
267 | and len(msg.params) == 2): | |
8cdf0349 | 268 | # Database contents changed. |
c39751e4 TE |
269 | if msg.params[0] == str(self.server_monitor_uuid): |
270 | self.__parse_update(msg.params[1], OVSDB_UPDATE, | |
271 | tables=self.server_tables) | |
272 | self.change_seqno = initial_change_seqno | |
273 | if not self.__check_server_db(): | |
274 | self.force_reconnect() | |
275 | break | |
276 | else: | |
277 | self.__parse_update(msg.params[1], OVSDB_UPDATE) | |
8cdf0349 BP |
278 | elif (msg.type == ovs.jsonrpc.Message.T_REPLY |
279 | and self._monitor_request_id is not None | |
280 | and self._monitor_request_id == msg.id): | |
281 | # Reply to our "monitor" request. | |
282 | try: | |
283 | self.change_seqno += 1 | |
284 | self._monitor_request_id = None | |
285 | self.__clear() | |
c39751e4 | 286 | if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED: |
897c8064 LS |
287 | self.__parse_update(msg.result, OVSDB_UPDATE2) |
288 | else: | |
c39751e4 | 289 | assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED |
897c8064 LS |
290 | self.__parse_update(msg.result, OVSDB_UPDATE) |
291 | ||
3ab76c56 | 292 | except error.Error as e: |
3a656eaf | 293 | vlog.err("%s: parse error in received schema: %s" |
897c8064 | 294 | % (self._session.get_name(), e)) |
8cdf0349 | 295 | self.__error() |
c39751e4 TE |
296 | elif (msg.type == ovs.jsonrpc.Message.T_REPLY |
297 | and self._server_schema_request_id is not None | |
298 | and self._server_schema_request_id == msg.id): | |
299 | # Reply to our "get_schema" of _Server request. | |
300 | try: | |
301 | self._server_schema_request_id = None | |
302 | sh = SchemaHelper(None, msg.result) | |
303 | sh.register_table(self._server_db_table) | |
304 | schema = sh.get_idl_schema() | |
305 | self._server_db = schema | |
306 | self.server_tables = schema.tables | |
307 | self.__send_server_monitor_request() | |
308 | except error.Error as e: | |
309 | vlog.err("%s: error receiving server schema: %s" | |
310 | % (self._session.get_name(), e)) | |
311 | if self.cluster_id: | |
312 | self.__error() | |
313 | break | |
314 | else: | |
315 | self.change_seqno = initial_change_seqno | |
316 | self.__send_monitor_request() | |
317 | elif (msg.type == ovs.jsonrpc.Message.T_REPLY | |
318 | and self._server_monitor_request_id is not None | |
319 | and self._server_monitor_request_id == msg.id): | |
320 | # Reply to our "monitor" of _Server request. | |
321 | try: | |
322 | self._server_monitor_request_id = None | |
323 | self.__parse_update(msg.result, OVSDB_UPDATE, | |
324 | tables=self.server_tables) | |
325 | self.change_seqno = initial_change_seqno | |
326 | if self.__check_server_db(): | |
327 | self.__send_monitor_request() | |
328 | self.__send_db_change_aware() | |
329 | else: | |
330 | self.force_reconnect() | |
331 | break | |
332 | except error.Error as e: | |
333 | vlog.err("%s: parse error in received schema: %s" | |
334 | % (self._session.get_name(), e)) | |
335 | if self.cluster_id: | |
336 | self.__error() | |
337 | break | |
338 | else: | |
339 | self.change_seqno = initial_change_seqno | |
340 | self.__send_monitor_request() | |
341 | elif (msg.type == ovs.jsonrpc.Message.T_REPLY | |
342 | and self._db_change_aware_request_id is not None | |
343 | and self._db_change_aware_request_id == msg.id): | |
344 | # Reply to us notifying the server of our change awarness. | |
345 | self._db_change_aware_request_id = None | |
8cdf0349 BP |
346 | elif (msg.type == ovs.jsonrpc.Message.T_REPLY |
347 | and self._lock_request_id is not None | |
348 | and self._lock_request_id == msg.id): | |
349 | # Reply to our "lock" request. | |
350 | self.__parse_lock_reply(msg.result) | |
351 | elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY | |
352 | and msg.method == "locked"): | |
353 | # We got our lock. | |
354 | self.__parse_lock_notify(msg.params, True) | |
355 | elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY | |
356 | and msg.method == "stolen"): | |
357 | # Someone else stole our lock. | |
358 | self.__parse_lock_notify(msg.params, False) | |
359 | elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo": | |
360 | # Reply to our echo request. Ignore it. | |
361 | pass | |
897c8064 | 362 | elif (msg.type == ovs.jsonrpc.Message.T_ERROR and |
c39751e4 | 363 | self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and |
897c8064 LS |
364 | self._monitor_request_id == msg.id): |
365 | if msg.error == "unknown method": | |
366 | self.__send_monitor_request() | |
c39751e4 TE |
367 | elif (msg.type == ovs.jsonrpc.Message.T_ERROR and |
368 | self._server_schema_request_id is not None and | |
369 | self._server_schema_request_id == msg.id): | |
370 | self._server_schema_request_id = None | |
371 | if self.cluster_id: | |
372 | self.force_reconnect() | |
373 | break | |
374 | else: | |
375 | self.change_seqno = initial_change_seqno | |
376 | self.__send_monitor_request() | |
8cdf0349 BP |
377 | elif (msg.type in (ovs.jsonrpc.Message.T_ERROR, |
378 | ovs.jsonrpc.Message.T_REPLY) | |
379 | and self.__txn_process_reply(msg)): | |
380 | # __txn_process_reply() did everything needed. | |
381 | pass | |
382 | else: | |
383 | # This can happen if a transaction is destroyed before we | |
384 | # receive the reply, so keep the log level low. | |
3a656eaf EJ |
385 | vlog.dbg("%s: received unexpected %s message" |
386 | % (self._session.get_name(), | |
387 | ovs.jsonrpc.Message.type_to_string(msg.type))) | |
8cdf0349 | 388 | |
99155935 BP |
389 | return initial_change_seqno != self.change_seqno |
390 | ||
16ebb90e LS |
391 | def send_cond_change(self): |
392 | if not self._session.is_connected(): | |
393 | return | |
394 | ||
0c4d144a | 395 | for table in self.tables.values(): |
16ebb90e LS |
396 | if table.cond_changed: |
397 | self.__send_cond_change(table, table.condition) | |
398 | table.cond_changed = False | |
399 | ||
0164e367 BP |
400 | def cond_change(self, table_name, cond): |
401 | """Sets the condition for 'table_name' to 'cond', which should be a | |
402 | conditional expression suitable for use directly in the OVSDB | |
403 | protocol, with the exception that the empty condition [] | |
404 | matches no rows (instead of matching every row). That is, [] | |
405 | is equivalent to [False], not to [True]. | |
406 | """ | |
16ebb90e | 407 | |
897c8064 LS |
408 | table = self.tables.get(table_name) |
409 | if not table: | |
410 | raise error.Error('Unknown table "%s"' % table_name) | |
16ebb90e | 411 | |
0164e367 BP |
412 | if cond == []: |
413 | cond = [False] | |
414 | if table.condition != cond: | |
415 | table.condition = cond | |
416 | table.cond_changed = True | |
897c8064 | 417 | |
99155935 BP |
418 | def wait(self, poller): |
419 | """Arranges for poller.block() to wake up when self.run() has something | |
420 | to do or when activity occurs on a transaction on 'self'.""" | |
8cdf0349 BP |
421 | self._session.wait(poller) |
422 | self._session.recv_wait(poller) | |
99155935 | 423 | |
8cdf0349 BP |
424 | def has_ever_connected(self): |
425 | """Returns True, if the IDL successfully connected to the remote | |
426 | database and retrieved its contents (even if the connection | |
427 | subsequently dropped and is in the process of reconnecting). If so, | |
428 | then the IDL contains an atomic snapshot of the database's contents | |
429 | (but it might be arbitrarily old if the connection dropped). | |
430 | ||
431 | Returns False if the IDL has never connected or retrieved the | |
432 | database's contents. If so, the IDL is empty.""" | |
433 | return self.change_seqno != 0 | |
434 | ||
435 | def force_reconnect(self): | |
436 | """Forces the IDL to drop its connection to the database and reconnect. | |
437 | In the meantime, the contents of the IDL will not change.""" | |
438 | self._session.force_reconnect() | |
439 | ||
c39751e4 TE |
440 | def session_name(self): |
441 | return self._session.get_name() | |
442 | ||
8cdf0349 BP |
443 | def set_lock(self, lock_name): |
444 | """If 'lock_name' is not None, configures the IDL to obtain the named | |
445 | lock from the database server and to avoid modifying the database when | |
446 | the lock cannot be acquired (that is, when another client has the same | |
447 | lock). | |
448 | ||
449 | If 'lock_name' is None, drops the locking requirement and releases the | |
450 | lock.""" | |
451 | assert not self.txn | |
452 | assert not self._outstanding_txns | |
453 | ||
454 | if self.lock_name and (not lock_name or lock_name != self.lock_name): | |
455 | # Release previous lock. | |
456 | self.__send_unlock_request() | |
457 | self.lock_name = None | |
458 | self.is_lock_contended = False | |
459 | ||
460 | if lock_name and not self.lock_name: | |
461 | # Acquire new lock. | |
462 | self.lock_name = lock_name | |
463 | self.__send_lock_request() | |
464 | ||
d7d417fc TW |
465 | def notify(self, event, row, updates=None): |
466 | """Hook for implementing create/update/delete notifications | |
467 | ||
468 | :param event: The event that was triggered | |
469 | :type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE | |
470 | :param row: The row as it is after the operation has occured | |
471 | :type row: Row | |
a7261bf7 NS |
472 | :param updates: For updates, row with only old values of the changed |
473 | columns | |
d7d417fc TW |
474 | :type updates: Row |
475 | """ | |
476 | ||
897c8064 LS |
477 | def __send_cond_change(self, table, cond): |
478 | monitor_cond_change = {table.name: [{"where": cond}]} | |
479 | old_uuid = str(self.uuid) | |
480 | self.uuid = uuid.uuid1() | |
481 | params = [old_uuid, str(self.uuid), monitor_cond_change] | |
482 | msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params) | |
483 | self._session.send(msg) | |
484 | ||
8cdf0349 BP |
485 | def __clear(self): |
486 | changed = False | |
487 | ||
0c4d144a | 488 | for table in self.tables.values(): |
8cdf0349 BP |
489 | if table.rows: |
490 | changed = True | |
13973bc4 | 491 | table.rows = custom_index.IndexedRows(table) |
99155935 | 492 | |
8cdf0349 BP |
493 | if changed: |
494 | self.change_seqno += 1 | |
495 | ||
496 | def __update_has_lock(self, new_has_lock): | |
497 | if new_has_lock and not self.has_lock: | |
498 | if self._monitor_request_id is None: | |
499 | self.change_seqno += 1 | |
500 | else: | |
501 | # We're waiting for a monitor reply, so don't signal that the | |
502 | # database changed. The monitor reply will increment | |
503 | # change_seqno anyhow. | |
504 | pass | |
505 | self.is_lock_contended = False | |
506 | self.has_lock = new_has_lock | |
507 | ||
508 | def __do_send_lock_request(self, method): | |
509 | self.__update_has_lock(False) | |
510 | self._lock_request_id = None | |
511 | if self._session.is_connected(): | |
512 | msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name]) | |
513 | msg_id = msg.id | |
514 | self._session.send(msg) | |
515 | else: | |
516 | msg_id = None | |
517 | return msg_id | |
518 | ||
519 | def __send_lock_request(self): | |
520 | self._lock_request_id = self.__do_send_lock_request("lock") | |
521 | ||
522 | def __send_unlock_request(self): | |
523 | self.__do_send_lock_request("unlock") | |
524 | ||
525 | def __parse_lock_reply(self, result): | |
526 | self._lock_request_id = None | |
da2d45c6 | 527 | got_lock = isinstance(result, dict) and result.get("locked") is True |
8cdf0349 BP |
528 | self.__update_has_lock(got_lock) |
529 | if not got_lock: | |
530 | self.is_lock_contended = True | |
531 | ||
532 | def __parse_lock_notify(self, params, new_has_lock): | |
533 | if (self.lock_name is not None | |
da2d45c6 | 534 | and isinstance(params, (list, tuple)) |
8cdf0349 BP |
535 | and params |
536 | and params[0] == self.lock_name): | |
e9ad9219 | 537 | self.__update_has_lock(new_has_lock) |
8cdf0349 BP |
538 | if not new_has_lock: |
539 | self.is_lock_contended = True | |
99155935 | 540 | |
c39751e4 TE |
541 | def __send_db_change_aware(self): |
542 | msg = ovs.jsonrpc.Message.create_request("set_db_change_aware", | |
543 | [True]) | |
544 | self._db_change_aware_request_id = msg.id | |
545 | self._session.send(msg) | |
546 | ||
99155935 | 547 | def __send_monitor_request(self): |
c39751e4 TE |
548 | if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED, |
549 | self.IDL_S_INITIAL]): | |
550 | self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED | |
897c8064 LS |
551 | method = "monitor_cond" |
552 | else: | |
c39751e4 | 553 | self.state = self.IDL_S_DATA_MONITOR_REQUESTED |
897c8064 LS |
554 | method = "monitor" |
555 | ||
99155935 | 556 | monitor_requests = {} |
0c4d144a | 557 | for table in self.tables.values(): |
80c12152 | 558 | columns = [] |
0c4d144a | 559 | for column in table.columns.keys(): |
80c12152 | 560 | if ((table.name not in self.readonly) or |
897c8064 LS |
561 | (table.name in self.readonly) and |
562 | (column not in self.readonly[table.name])): | |
80c12152 | 563 | columns.append(column) |
62bba609 | 564 | monitor_request = {"columns": columns} |
0164e367 | 565 | if method == "monitor_cond" and table.condition != [True]: |
62bba609 | 566 | monitor_request["where"] = table.condition |
16ebb90e | 567 | table.cond_change = False |
62bba609 | 568 | monitor_requests[table.name] = [monitor_request] |
897c8064 | 569 | |
99155935 | 570 | msg = ovs.jsonrpc.Message.create_request( |
897c8064 | 571 | method, [self._db.name, str(self.uuid), monitor_requests]) |
8cdf0349 BP |
572 | self._monitor_request_id = msg.id |
573 | self._session.send(msg) | |
99155935 | 574 | |
c39751e4 TE |
575 | def __send_server_schema_request(self): |
576 | self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED | |
577 | msg = ovs.jsonrpc.Message.create_request( | |
578 | "get_schema", [self._server_db_name, str(self.uuid)]) | |
579 | self._server_schema_request_id = msg.id | |
580 | self._session.send(msg) | |
581 | ||
582 | def __send_server_monitor_request(self): | |
583 | self.state = self.IDL_S_SERVER_MONITOR_REQUESTED | |
584 | monitor_requests = {} | |
585 | table = self.server_tables[self._server_db_table] | |
0c4d144a TR |
586 | columns = [column for column in table.columns.keys()] |
587 | for column in table.columns.values(): | |
c39751e4 TE |
588 | if not hasattr(column, 'alert'): |
589 | column.alert = True | |
590 | table.rows = custom_index.IndexedRows(table) | |
591 | table.need_table = False | |
592 | table.idl = self | |
593 | monitor_request = {"columns": columns} | |
594 | monitor_requests[table.name] = [monitor_request] | |
595 | msg = ovs.jsonrpc.Message.create_request( | |
596 | 'monitor', [self._server_db.name, | |
597 | str(self.server_monitor_uuid), | |
598 | monitor_requests]) | |
599 | self._server_monitor_request_id = msg.id | |
600 | self._session.send(msg) | |
601 | ||
602 | def __parse_update(self, update, version, tables=None): | |
99155935 | 603 | try: |
c39751e4 TE |
604 | if not tables: |
605 | self.__do_parse_update(update, version, self.tables) | |
606 | else: | |
607 | self.__do_parse_update(update, version, tables) | |
3ab76c56 | 608 | except error.Error as e: |
3a656eaf EJ |
609 | vlog.err("%s: error parsing update: %s" |
610 | % (self._session.get_name(), e)) | |
99155935 | 611 | |
c39751e4 | 612 | def __do_parse_update(self, table_updates, version, tables): |
da2d45c6 | 613 | if not isinstance(table_updates, dict): |
99155935 BP |
614 | raise error.Error("<table-updates> is not an object", |
615 | table_updates) | |
616 | ||
0c4d144a | 617 | for table_name, table_update in table_updates.items(): |
c39751e4 | 618 | table = tables.get(table_name) |
99155935 | 619 | if not table: |
f2d8ad13 BP |
620 | raise error.Error('<table-updates> includes unknown ' |
621 | 'table "%s"' % table_name) | |
99155935 | 622 | |
da2d45c6 | 623 | if not isinstance(table_update, dict): |
f2d8ad13 BP |
624 | raise error.Error('<table-update> for table "%s" is not ' |
625 | 'an object' % table_name, table_update) | |
99155935 | 626 | |
0c4d144a | 627 | for uuid_string, row_update in table_update.items(): |
49c541dc | 628 | if not ovs.ovsuuid.is_valid_string(uuid_string): |
f2d8ad13 BP |
629 | raise error.Error('<table-update> for table "%s" ' |
630 | 'contains bad UUID "%s" as member ' | |
631 | 'name' % (table_name, uuid_string), | |
99155935 | 632 | table_update) |
49c541dc | 633 | uuid = ovs.ovsuuid.from_string(uuid_string) |
99155935 | 634 | |
da2d45c6 | 635 | if not isinstance(row_update, dict): |
f2d8ad13 BP |
636 | raise error.Error('<table-update> for table "%s" ' |
637 | 'contains <row-update> for %s that ' | |
638 | 'is not an object' | |
99155935 BP |
639 | % (table_name, uuid_string)) |
640 | ||
897c8064 LS |
641 | if version == OVSDB_UPDATE2: |
642 | if self.__process_update2(table, uuid, row_update): | |
643 | self.change_seqno += 1 | |
644 | continue | |
645 | ||
af1eba26 | 646 | parser = ovs.db.parser.Parser(row_update, "row-update") |
4c0f6271 BP |
647 | old = parser.get_optional("old", [dict]) |
648 | new = parser.get_optional("new", [dict]) | |
649 | parser.finish() | |
99155935 | 650 | |
99155935 | 651 | if not old and not new: |
f2d8ad13 BP |
652 | raise error.Error('<row-update> missing "old" and ' |
653 | '"new" members', row_update) | |
99155935 | 654 | |
8cdf0349 | 655 | if self.__process_update(table, uuid, old, new): |
99155935 BP |
656 | self.change_seqno += 1 |
657 | ||
897c8064 LS |
658 | def __process_update2(self, table, uuid, row_update): |
659 | row = table.rows.get(uuid) | |
660 | changed = False | |
661 | if "delete" in row_update: | |
662 | if row: | |
663 | del table.rows[uuid] | |
664 | self.notify(ROW_DELETE, row) | |
665 | changed = True | |
666 | else: | |
667 | # XXX rate-limit | |
668 | vlog.warn("cannot delete missing row %s from table" | |
669 | "%s" % (uuid, table.name)) | |
670 | elif "insert" in row_update or "initial" in row_update: | |
671 | if row: | |
672 | vlog.warn("cannot add existing row %s from table" | |
673 | " %s" % (uuid, table.name)) | |
674 | del table.rows[uuid] | |
675 | row = self.__create_row(table, uuid) | |
676 | if "insert" in row_update: | |
677 | row_update = row_update['insert'] | |
678 | else: | |
679 | row_update = row_update['initial'] | |
680 | self.__add_default(table, row_update) | |
13973bc4 TW |
681 | changed = self.__row_update(table, row, row_update) |
682 | table.rows[uuid] = row | |
683 | if changed: | |
897c8064 LS |
684 | self.notify(ROW_CREATE, row) |
685 | elif "modify" in row_update: | |
686 | if not row: | |
687 | raise error.Error('Modify non-existing row') | |
688 | ||
eab13876 DA |
689 | old_row = self.__apply_diff(table, row, row_update['modify']) |
690 | self.notify(ROW_UPDATE, row, Row(self, table, uuid, old_row)) | |
897c8064 LS |
691 | changed = True |
692 | else: | |
693 | raise error.Error('<row-update> unknown operation', | |
694 | row_update) | |
695 | return changed | |
696 | ||
8cdf0349 | 697 | def __process_update(self, table, uuid, old, new): |
99155935 | 698 | """Returns True if a column changed, False otherwise.""" |
8cdf0349 | 699 | row = table.rows.get(uuid) |
7d48f8f8 | 700 | changed = False |
99155935 BP |
701 | if not new: |
702 | # Delete row. | |
703 | if row: | |
8cdf0349 | 704 | del table.rows[uuid] |
7d48f8f8 | 705 | changed = True |
d7d417fc | 706 | self.notify(ROW_DELETE, row) |
99155935 BP |
707 | else: |
708 | # XXX rate-limit | |
3a656eaf EJ |
709 | vlog.warn("cannot delete missing row %s from table %s" |
710 | % (uuid, table.name)) | |
99155935 BP |
711 | elif not old: |
712 | # Insert row. | |
13973bc4 | 713 | op = ROW_CREATE |
99155935 BP |
714 | if not row: |
715 | row = self.__create_row(table, uuid) | |
7d48f8f8 | 716 | changed = True |
99155935 BP |
717 | else: |
718 | # XXX rate-limit | |
13973bc4 | 719 | op = ROW_UPDATE |
3a656eaf EJ |
720 | vlog.warn("cannot add existing row %s to table %s" |
721 | % (uuid, table.name)) | |
13973bc4 TW |
722 | changed |= self.__row_update(table, row, new) |
723 | if op == ROW_CREATE: | |
724 | table.rows[uuid] = row | |
725 | if changed: | |
d7d417fc | 726 | self.notify(ROW_CREATE, row) |
99155935 | 727 | else: |
d7d417fc | 728 | op = ROW_UPDATE |
99155935 BP |
729 | if not row: |
730 | row = self.__create_row(table, uuid) | |
7d48f8f8 | 731 | changed = True |
d7d417fc | 732 | op = ROW_CREATE |
99155935 | 733 | # XXX rate-limit |
3a656eaf EJ |
734 | vlog.warn("cannot modify missing row %s in table %s" |
735 | % (uuid, table.name)) | |
13973bc4 TW |
736 | changed |= self.__row_update(table, row, new) |
737 | if op == ROW_CREATE: | |
738 | table.rows[uuid] = row | |
739 | if changed: | |
d7d417fc | 740 | self.notify(op, row, Row.from_json(self, table, uuid, old)) |
7d48f8f8 | 741 | return changed |
99155935 | 742 | |
c39751e4 TE |
743 | def __check_server_db(self): |
744 | """Returns True if this is a valid server database, False otherwise.""" | |
745 | session_name = self.session_name() | |
746 | ||
747 | if self._server_db_table not in self.server_tables: | |
748 | vlog.info("%s: server does not have %s table in its %s database" | |
749 | % (session_name, self._server_db_table, | |
750 | self._server_db_name)) | |
751 | return False | |
752 | ||
753 | rows = self.server_tables[self._server_db_table].rows | |
754 | ||
755 | database = None | |
0c4d144a | 756 | for row in rows.values(): |
c39751e4 TE |
757 | if self.cluster_id: |
758 | if self.cluster_id in \ | |
759 | map(lambda x: str(x)[:4], row.cid): | |
760 | database = row | |
761 | break | |
762 | elif row.name == self._db.name: | |
763 | database = row | |
764 | break | |
765 | ||
766 | if not database: | |
767 | vlog.info("%s: server does not have %s database" | |
768 | % (session_name, self._db.name)) | |
769 | return False | |
770 | ||
771 | if (database.model == CLUSTERED and | |
772 | self._session.get_num_of_remotes() > 1): | |
773 | if not database.schema: | |
774 | vlog.info('%s: clustered database server has not yet joined ' | |
775 | 'cluster; trying another server' % session_name) | |
776 | return False | |
777 | if not database.connected: | |
778 | vlog.info('%s: clustered database server is disconnected ' | |
779 | 'from cluster; trying another server' % session_name) | |
780 | return False | |
781 | if (self.leader_only and | |
782 | not database.leader): | |
783 | vlog.info('%s: clustered database server is not cluster ' | |
784 | 'leader; trying another server' % session_name) | |
785 | return False | |
786 | if database.index: | |
787 | if database.index[0] < self._min_index: | |
788 | vlog.warn('%s: clustered database server has stale data; ' | |
789 | 'trying another server' % session_name) | |
790 | return False | |
791 | self._min_index = database.index[0] | |
792 | ||
793 | return True | |
794 | ||
897c8064 LS |
795 | def __column_name(self, column): |
796 | if column.type.key.type == ovs.db.types.UuidType: | |
797 | return ovs.ovsuuid.to_json(column.type.key.type.default) | |
798 | else: | |
799 | return column.type.key.type.default | |
800 | ||
801 | def __add_default(self, table, row_update): | |
0c4d144a | 802 | for column in table.columns.values(): |
897c8064 LS |
803 | if column.name not in row_update: |
804 | if ((table.name not in self.readonly) or | |
805 | (table.name in self.readonly) and | |
806 | (column.name not in self.readonly[table.name])): | |
807 | if column.type.n_min != 0 and not column.type.is_map(): | |
808 | row_update[column.name] = self.__column_name(column) | |
809 | ||
810 | def __apply_diff(self, table, row, row_diff): | |
eab13876 | 811 | old_row = {} |
0c4d144a | 812 | for column_name, datum_diff_json in row_diff.items(): |
897c8064 LS |
813 | column = table.columns.get(column_name) |
814 | if not column: | |
815 | # XXX rate-limit | |
816 | vlog.warn("unknown column %s updating table %s" | |
817 | % (column_name, table.name)) | |
818 | continue | |
819 | ||
820 | try: | |
a59912a0 | 821 | datum_diff = data.Datum.from_json(column.type, datum_diff_json) |
897c8064 LS |
822 | except error.Error as e: |
823 | # XXX rate-limit | |
824 | vlog.warn("error parsing column %s in table %s: %s" | |
825 | % (column_name, table.name, e)) | |
826 | continue | |
827 | ||
eab13876 | 828 | old_row[column_name] = row._data[column_name].copy() |
a7261bf7 | 829 | datum = row._data[column_name].diff(datum_diff) |
897c8064 LS |
830 | if datum != row._data[column_name]: |
831 | row._data[column_name] = datum | |
832 | ||
eab13876 | 833 | return old_row |
a7261bf7 | 834 | |
8cdf0349 | 835 | def __row_update(self, table, row, row_json): |
99155935 | 836 | changed = False |
0c4d144a | 837 | for column_name, datum_json in row_json.items(): |
99155935 BP |
838 | column = table.columns.get(column_name) |
839 | if not column: | |
840 | # XXX rate-limit | |
3a656eaf EJ |
841 | vlog.warn("unknown column %s updating table %s" |
842 | % (column_name, table.name)) | |
99155935 BP |
843 | continue |
844 | ||
845 | try: | |
a59912a0 | 846 | datum = data.Datum.from_json(column.type, datum_json) |
3ab76c56 | 847 | except error.Error as e: |
99155935 | 848 | # XXX rate-limit |
3a656eaf EJ |
849 | vlog.warn("error parsing column %s in table %s: %s" |
850 | % (column_name, table.name, e)) | |
99155935 BP |
851 | continue |
852 | ||
8cdf0349 BP |
853 | if datum != row._data[column_name]: |
854 | row._data[column_name] = datum | |
855 | if column.alert: | |
856 | changed = True | |
99155935 BP |
857 | else: |
858 | # Didn't really change but the OVSDB monitor protocol always | |
859 | # includes every value in a row. | |
860 | pass | |
861 | return changed | |
862 | ||
99155935 | 863 | def __create_row(self, table, uuid): |
8cdf0349 | 864 | data = {} |
0c4d144a | 865 | for column in table.columns.values(): |
8cdf0349 | 866 | data[column.name] = ovs.db.data.Datum.default(column.type) |
13973bc4 | 867 | return Row(self, table, uuid, data) |
99155935 | 868 | |
8cdf0349 BP |
869 | def __error(self): |
870 | self._session.force_reconnect() | |
871 | ||
872 | def __txn_abort_all(self): | |
873 | while self._outstanding_txns: | |
874 | txn = self._outstanding_txns.popitem()[1] | |
854a94d9 | 875 | txn._status = Transaction.TRY_AGAIN |
8cdf0349 BP |
876 | |
877 | def __txn_process_reply(self, msg): | |
878 | txn = self._outstanding_txns.pop(msg.id, None) | |
879 | if txn: | |
880 | txn._process_reply(msg) | |
beba3d82 | 881 | return True |
8cdf0349 | 882 | |
26bb0f31 | 883 | |
8cdf0349 BP |
884 | def _uuid_to_row(atom, base): |
885 | if base.ref_table: | |
886 | return base.ref_table.rows.get(atom) | |
887 | else: | |
888 | return atom | |
889 | ||
26bb0f31 | 890 | |
8cdf0349 | 891 | def _row_to_uuid(value): |
da2d45c6 | 892 | if isinstance(value, Row): |
8cdf0349 BP |
893 | return value.uuid |
894 | else: | |
895 | return value | |
bf6ec045 | 896 | |
26bb0f31 | 897 | |
fbafc3c2 | 898 | @functools.total_ordering |
bf6ec045 | 899 | class Row(object): |
8cdf0349 BP |
900 | """A row within an IDL. |
901 | ||
902 | The client may access the following attributes directly: | |
903 | ||
904 | - 'uuid': a uuid.UUID object whose value is the row's database UUID. | |
905 | ||
906 | - An attribute for each column in the Row's table, named for the column, | |
907 | whose values are as returned by Datum.to_python() for the column's type. | |
908 | ||
909 | If some error occurs (e.g. the database server's idea of the column is | |
910 | different from the IDL's idea), then the attribute values is the | |
911 | "default" value return by Datum.default() for the column's type. (It is | |
912 | important to know this because the default value may violate constraints | |
913 | for the column's type, e.g. the default integer value is 0 even if column | |
914 | contraints require the column's value to be positive.) | |
915 | ||
916 | When a transaction is active, column attributes may also be assigned new | |
917 | values. Committing the transaction will then cause the new value to be | |
918 | stored into the database. | |
919 | ||
920 | *NOTE*: In the current implementation, the value of a column is a *copy* | |
921 | of the value in the database. This means that modifying its value | |
922 | directly will have no useful effect. For example, the following: | |
923 | row.mycolumn["a"] = "b" # don't do this | |
924 | will not change anything in the database, even after commit. To modify | |
925 | the column, instead assign the modified column value back to the column: | |
926 | d = row.mycolumn | |
927 | d["a"] = "b" | |
928 | row.mycolumn = d | |
929 | """ | |
930 | def __init__(self, idl, table, uuid, data): | |
931 | # All of the explicit references to self.__dict__ below are required | |
932 | # to set real attributes with invoking self.__getattr__(). | |
933 | self.__dict__["uuid"] = uuid | |
934 | ||
935 | self.__dict__["_idl"] = idl | |
936 | self.__dict__["_table"] = table | |
937 | ||
938 | # _data is the committed data. It takes the following values: | |
939 | # | |
940 | # - A dictionary that maps every column name to a Datum, if the row | |
941 | # exists in the committed form of the database. | |
942 | # | |
943 | # - None, if this row is newly inserted within the active transaction | |
944 | # and thus has no committed form. | |
945 | self.__dict__["_data"] = data | |
946 | ||
947 | # _changes describes changes to this row within the active transaction. | |
948 | # It takes the following values: | |
949 | # | |
950 | # - {}, the empty dictionary, if no transaction is active or if the | |
951 | # row has yet not been changed within this transaction. | |
952 | # | |
953 | # - A dictionary that maps a column name to its new Datum, if an | |
954 | # active transaction changes those columns' values. | |
955 | # | |
956 | # - A dictionary that maps every column name to a Datum, if the row | |
957 | # is newly inserted within the active transaction. | |
958 | # | |
959 | # - None, if this transaction deletes this row. | |
960 | self.__dict__["_changes"] = {} | |
961 | ||
a59912a0 RM |
962 | # _mutations describes changes to this row to be handled via a |
963 | # mutate operation on the wire. It takes the following values: | |
964 | # | |
965 | # - {}, the empty dictionary, if no transaction is active or if the | |
966 | # row has yet not been mutated within this transaction. | |
967 | # | |
968 | # - A dictionary that contains two keys: | |
969 | # | |
970 | # - "_inserts" contains a dictionary that maps column names to | |
971 | # new keys/key-value pairs that should be inserted into the | |
972 | # column | |
973 | # - "_removes" contains a dictionary that maps column names to | |
974 | # the keys/key-value pairs that should be removed from the | |
975 | # column | |
976 | # | |
977 | # - None, if this transaction deletes this row. | |
978 | self.__dict__["_mutations"] = {} | |
979 | ||
8cdf0349 BP |
980 | # A dictionary whose keys are the names of columns that must be |
981 | # verified as prerequisites when the transaction commits. The values | |
982 | # in the dictionary are all None. | |
983 | self.__dict__["_prereqs"] = {} | |
984 | ||
fbafc3c2 RB |
985 | def __lt__(self, other): |
986 | if not isinstance(other, Row): | |
987 | return NotImplemented | |
988 | return bool(self.__dict__['uuid'] < other.__dict__['uuid']) | |
989 | ||
990 | def __eq__(self, other): | |
991 | if not isinstance(other, Row): | |
992 | return NotImplemented | |
993 | return bool(self.__dict__['uuid'] == other.__dict__['uuid']) | |
994 | ||
995 | def __hash__(self): | |
996 | return int(self.__dict__['uuid']) | |
997 | ||
6a1c9846 TW |
998 | def __str__(self): |
999 | return "{table}({data})".format( | |
1000 | table=self._table.name, | |
1001 | data=", ".join("{col}={val}".format(col=c, val=getattr(self, c)) | |
1002 | for c in sorted(self._table.columns))) | |
1003 | ||
8cdf0349 BP |
1004 | def __getattr__(self, column_name): |
1005 | assert self._changes is not None | |
a59912a0 | 1006 | assert self._mutations is not None |
8cdf0349 | 1007 | |
685e6983 TR |
1008 | try: |
1009 | column = self._table.columns[column_name] | |
1010 | except KeyError: | |
1011 | raise AttributeError("%s instance has no attribute '%s'" % | |
1012 | (self.__class__.__name__, column_name)) | |
8cdf0349 | 1013 | datum = self._changes.get(column_name) |
a59912a0 RM |
1014 | inserts = None |
1015 | if '_inserts' in self._mutations.keys(): | |
1016 | inserts = self._mutations['_inserts'].get(column_name) | |
1017 | removes = None | |
1018 | if '_removes' in self._mutations.keys(): | |
1019 | removes = self._mutations['_removes'].get(column_name) | |
8cdf0349 | 1020 | if datum is None: |
3b4c362f | 1021 | if self._data is None: |
a59912a0 RM |
1022 | if inserts is None: |
1023 | raise AttributeError("%s instance has no attribute '%s'" % | |
1024 | (self.__class__.__name__, | |
1025 | column_name)) | |
1026 | else: | |
2d54d801 AB |
1027 | datum = data.Datum.from_python(column.type, |
1028 | inserts, | |
1029 | _row_to_uuid) | |
1030 | elif column_name in self._data: | |
80c12152 | 1031 | datum = self._data[column_name] |
2d54d801 AB |
1032 | if column.type.is_set(): |
1033 | dlist = datum.as_list() | |
a59912a0 | 1034 | if inserts is not None: |
2d54d801 | 1035 | dlist.extend(list(inserts)) |
a59912a0 | 1036 | if removes is not None: |
2d54d801 AB |
1037 | removes_datum = data.Datum.from_python(column.type, |
1038 | removes, | |
1039 | _row_to_uuid) | |
1040 | removes_list = removes_datum.as_list() | |
1041 | dlist = [x for x in dlist if x not in removes_list] | |
1042 | datum = data.Datum.from_python(column.type, dlist, | |
1043 | _row_to_uuid) | |
1044 | elif column.type.is_map(): | |
1045 | dmap = datum.to_python(_uuid_to_row) | |
a59912a0 | 1046 | if inserts is not None: |
2d54d801 | 1047 | dmap.update(inserts) |
a59912a0 | 1048 | if removes is not None: |
2d54d801 AB |
1049 | for key in removes: |
1050 | if key not in (inserts or {}): | |
f192ba27 | 1051 | dmap.pop(key, None) |
2d54d801 AB |
1052 | datum = data.Datum.from_python(column.type, dmap, |
1053 | _row_to_uuid) | |
80c12152 | 1054 | else: |
a59912a0 RM |
1055 | if inserts is None: |
1056 | raise AttributeError("%s instance has no attribute '%s'" % | |
1057 | (self.__class__.__name__, | |
1058 | column_name)) | |
1059 | else: | |
1060 | datum = inserts | |
8cdf0349 BP |
1061 | |
1062 | return datum.to_python(_uuid_to_row) | |
1063 | ||
1064 | def __setattr__(self, column_name, value): | |
1065 | assert self._changes is not None | |
1066 | assert self._idl.txn | |
1067 | ||
80c12152 | 1068 | if ((self._table.name in self._idl.readonly) and |
897c8064 | 1069 | (column_name in self._idl.readonly[self._table.name])): |
37520ab3 RB |
1070 | vlog.warn("attempting to write to readonly column %s" |
1071 | % column_name) | |
80c12152 SA |
1072 | return |
1073 | ||
8cdf0349 BP |
1074 | column = self._table.columns[column_name] |
1075 | try: | |
a59912a0 | 1076 | datum = data.Datum.from_python(column.type, value, _row_to_uuid) |
3ab76c56 | 1077 | except error.Error as e: |
8cdf0349 | 1078 | # XXX rate-limit |
3a656eaf EJ |
1079 | vlog.err("attempting to write bad value to column %s (%s)" |
1080 | % (column_name, e)) | |
8cdf0349 | 1081 | return |
13973bc4 TW |
1082 | # Remove prior version of the Row from the index if it has the indexed |
1083 | # column set, and the column changing is an indexed column | |
1084 | if hasattr(self, column_name): | |
1085 | for idx in self._table.rows.indexes.values(): | |
1086 | if column_name in (c.column for c in idx.columns): | |
1087 | idx.remove(self) | |
8cdf0349 | 1088 | self._idl.txn._write(self, column, datum) |
13973bc4 TW |
1089 | for idx in self._table.rows.indexes.values(): |
1090 | # Only update the index if indexed columns change | |
1091 | if column_name in (c.column for c in idx.columns): | |
1092 | idx.add(self) | |
8cdf0349 | 1093 | |
a59912a0 RM |
1094 | def addvalue(self, column_name, key): |
1095 | self._idl.txn._txn_rows[self.uuid] = self | |
330b9c9c AB |
1096 | column = self._table.columns[column_name] |
1097 | try: | |
1098 | data.Datum.from_python(column.type, key, _row_to_uuid) | |
1099 | except error.Error as e: | |
1100 | # XXX rate-limit | |
1101 | vlog.err("attempting to write bad value to column %s (%s)" | |
1102 | % (column_name, e)) | |
1103 | return | |
1104 | inserts = self._mutations.setdefault('_inserts', {}) | |
1105 | column_value = inserts.setdefault(column_name, set()) | |
1106 | column_value.add(key) | |
a59912a0 RM |
1107 | |
1108 | def delvalue(self, column_name, key): | |
1109 | self._idl.txn._txn_rows[self.uuid] = self | |
330b9c9c AB |
1110 | column = self._table.columns[column_name] |
1111 | try: | |
1112 | data.Datum.from_python(column.type, key, _row_to_uuid) | |
1113 | except error.Error as e: | |
1114 | # XXX rate-limit | |
1115 | vlog.err("attempting to delete bad value from column %s (%s)" | |
1116 | % (column_name, e)) | |
1117 | return | |
1118 | removes = self._mutations.setdefault('_removes', {}) | |
1119 | column_value = removes.setdefault(column_name, set()) | |
1120 | column_value.add(key) | |
a59912a0 RM |
1121 | |
1122 | def setkey(self, column_name, key, value): | |
1123 | self._idl.txn._txn_rows[self.uuid] = self | |
1124 | column = self._table.columns[column_name] | |
1125 | try: | |
330b9c9c | 1126 | data.Datum.from_python(column.type, {key: value}, _row_to_uuid) |
a59912a0 RM |
1127 | except error.Error as e: |
1128 | # XXX rate-limit | |
1129 | vlog.err("attempting to write bad value to column %s (%s)" | |
1130 | % (column_name, e)) | |
1131 | return | |
2d54d801 | 1132 | if self._data and column_name in self._data: |
330b9c9c AB |
1133 | # Remove existing key/value before updating. |
1134 | removes = self._mutations.setdefault('_removes', {}) | |
1135 | column_value = removes.setdefault(column_name, set()) | |
1136 | column_value.add(key) | |
1137 | inserts = self._mutations.setdefault('_inserts', {}) | |
1138 | column_value = inserts.setdefault(column_name, {}) | |
1139 | column_value[key] = value | |
1140 | ||
1141 | def delkey(self, column_name, key, value=None): | |
a59912a0 | 1142 | self._idl.txn._txn_rows[self.uuid] = self |
330b9c9c AB |
1143 | if value: |
1144 | try: | |
1145 | old_value = data.Datum.to_python(self._data[column_name], | |
1146 | _uuid_to_row) | |
1147 | except error.Error: | |
1148 | return | |
1149 | if key not in old_value: | |
1150 | return | |
1151 | if old_value[key] != value: | |
1152 | return | |
1153 | removes = self._mutations.setdefault('_removes', {}) | |
1154 | column_value = removes.setdefault(column_name, set()) | |
1155 | column_value.add(key) | |
a59912a0 RM |
1156 | return |
1157 | ||
d7d417fc TW |
1158 | @classmethod |
1159 | def from_json(cls, idl, table, uuid, row_json): | |
1160 | data = {} | |
0c4d144a | 1161 | for column_name, datum_json in row_json.items(): |
d7d417fc TW |
1162 | column = table.columns.get(column_name) |
1163 | if not column: | |
1164 | # XXX rate-limit | |
1165 | vlog.warn("unknown column %s in table %s" | |
1166 | % (column_name, table.name)) | |
1167 | continue | |
1168 | try: | |
1169 | datum = ovs.db.data.Datum.from_json(column.type, datum_json) | |
3ab76c56 | 1170 | except error.Error as e: |
d7d417fc TW |
1171 | # XXX rate-limit |
1172 | vlog.warn("error parsing column %s in table %s: %s" | |
1173 | % (column_name, table.name, e)) | |
1174 | continue | |
1175 | data[column_name] = datum | |
1176 | return cls(idl, table, uuid, data) | |
1177 | ||
8cdf0349 BP |
1178 | def verify(self, column_name): |
1179 | """Causes the original contents of column 'column_name' in this row to | |
1180 | be verified as a prerequisite to completing the transaction. That is, | |
1181 | if 'column_name' changed in this row (or if this row was deleted) | |
1182 | between the time that the IDL originally read its contents and the time | |
1183 | that the transaction commits, then the transaction aborts and | |
854a94d9 | 1184 | Transaction.commit() returns Transaction.TRY_AGAIN. |
8cdf0349 BP |
1185 | |
1186 | The intention is that, to ensure that no transaction commits based on | |
1187 | dirty reads, an application should call Row.verify() on each data item | |
1188 | read as part of a read-modify-write operation. | |
1189 | ||
1190 | In some cases Row.verify() reduces to a no-op, because the current | |
1191 | value of the column is already known: | |
1192 | ||
1193 | - If this row is a row created by the current transaction (returned | |
1194 | by Transaction.insert()). | |
1195 | ||
1196 | - If the column has already been modified within the current | |
1197 | transaction. | |
1198 | ||
1199 | Because of the latter property, always call Row.verify() *before* | |
1200 | modifying the column, for a given read-modify-write. | |
1201 | ||
1202 | A transaction must be in progress.""" | |
1203 | assert self._idl.txn | |
1204 | assert self._changes is not None | |
1205 | if not self._data or column_name in self._changes: | |
1206 | return | |
1207 | ||
1208 | self._prereqs[column_name] = None | |
1209 | ||
1210 | def delete(self): | |
1211 | """Deletes this row from its table. | |
1212 | ||
1213 | A transaction must be in progress.""" | |
1214 | assert self._idl.txn | |
1215 | assert self._changes is not None | |
1216 | if self._data is None: | |
1217 | del self._idl.txn._txn_rows[self.uuid] | |
8d3efc1c BP |
1218 | else: |
1219 | self._idl.txn._txn_rows[self.uuid] = self | |
8cdf0349 | 1220 | del self._table.rows[self.uuid] |
13973bc4 | 1221 | self.__dict__["_changes"] = None |
8cdf0349 | 1222 | |
80c12152 SA |
1223 | def fetch(self, column_name): |
1224 | self._idl.txn._fetch(self, column_name) | |
1225 | ||
94fbe1aa BP |
1226 | def increment(self, column_name): |
1227 | """Causes the transaction, when committed, to increment the value of | |
1228 | 'column_name' within this row by 1. 'column_name' must have an integer | |
1229 | type. After the transaction commits successfully, the client may | |
1230 | retrieve the final (incremented) value of 'column_name' with | |
1231 | Transaction.get_increment_new_value(). | |
1232 | ||
1233 | The client could accomplish something similar by reading and writing | |
1234 | and verify()ing columns. However, increment() will never (by itself) | |
1235 | cause a transaction to fail because of a verify error. | |
1236 | ||
1237 | The intended use is for incrementing the "next_cfg" column in | |
1238 | the Open_vSwitch table.""" | |
1239 | self._idl.txn._increment(self, column_name) | |
1240 | ||
26bb0f31 | 1241 | |
8cdf0349 BP |
1242 | def _uuid_name_from_uuid(uuid): |
1243 | return "row%s" % str(uuid).replace("-", "_") | |
1244 | ||
26bb0f31 | 1245 | |
8cdf0349 BP |
1246 | def _where_uuid_equals(uuid): |
1247 | return [["_uuid", "==", ["uuid", str(uuid)]]] | |
1248 | ||
26bb0f31 | 1249 | |
8cdf0349 BP |
1250 | class _InsertedRow(object): |
1251 | def __init__(self, op_index): | |
1252 | self.op_index = op_index | |
1253 | self.real = None | |
1254 | ||
26bb0f31 | 1255 | |
8cdf0349 | 1256 | class Transaction(object): |
2f926787 BP |
1257 | """A transaction may modify the contents of a database by modifying the |
1258 | values of columns, deleting rows, inserting rows, or adding checks that | |
1259 | columns in the database have not changed ("verify" operations), through | |
1260 | Row methods. | |
1261 | ||
1262 | Reading and writing columns and inserting and deleting rows are all | |
1263 | straightforward. The reasons to verify columns are less obvious. | |
1264 | Verification is the key to maintaining transactional integrity. Because | |
1265 | OVSDB handles multiple clients, it can happen that between the time that | |
1266 | OVSDB client A reads a column and writes a new value, OVSDB client B has | |
1267 | written that column. Client A's write should not ordinarily overwrite | |
1268 | client B's, especially if the column in question is a "map" column that | |
1269 | contains several more or less independent data items. If client A adds a | |
1270 | "verify" operation before it writes the column, then the transaction fails | |
1271 | in case client B modifies it first. Client A will then see the new value | |
1272 | of the column and compose a new transaction based on the new contents | |
1273 | written by client B. | |
1274 | ||
1275 | When a transaction is complete, which must be before the next call to | |
1276 | Idl.run(), call Transaction.commit() or Transaction.abort(). | |
1277 | ||
1278 | The life-cycle of a transaction looks like this: | |
1279 | ||
1280 | 1. Create the transaction and record the initial sequence number: | |
1281 | ||
1282 | seqno = idl.change_seqno(idl) | |
1283 | txn = Transaction(idl) | |
1284 | ||
1285 | 2. Modify the database with Row and Transaction methods. | |
1286 | ||
1287 | 3. Commit the transaction by calling Transaction.commit(). The first call | |
1288 | to this function probably returns Transaction.INCOMPLETE. The client | |
1289 | must keep calling again along as this remains true, calling Idl.run() in | |
1290 | between to let the IDL do protocol processing. (If the client doesn't | |
1291 | have anything else to do in the meantime, it can use | |
1292 | Transaction.commit_block() to avoid having to loop itself.) | |
1293 | ||
1294 | 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno | |
1295 | to change from the saved 'seqno' (it's possible that it's already | |
1296 | changed, in which case the client should not wait at all), then start | |
1297 | over from step 1. Only a call to Idl.run() will change the return value | |
1298 | of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)""" | |
1299 | ||
8cdf0349 | 1300 | # Status values that Transaction.commit() can return. |
eda26d40 RB |
1301 | |
1302 | # Not yet committed or aborted. | |
1303 | UNCOMMITTED = "uncommitted" | |
1304 | # Transaction didn't include any changes. | |
1305 | UNCHANGED = "unchanged" | |
1306 | # Commit in progress, please wait. | |
1307 | INCOMPLETE = "incomplete" | |
1308 | # ovsdb_idl_txn_abort() called. | |
1309 | ABORTED = "aborted" | |
1310 | # Commit successful. | |
1311 | SUCCESS = "success" | |
1312 | # Commit failed because a "verify" operation | |
1313 | # reported an inconsistency, due to a network | |
1314 | # problem, or other transient failure. Wait | |
1315 | # for a change, then try again. | |
1316 | TRY_AGAIN = "try again" | |
1317 | # Server hasn't given us the lock yet. | |
1318 | NOT_LOCKED = "not locked" | |
1319 | # Commit failed due to a hard error. | |
1320 | ERROR = "error" | |
8cdf0349 BP |
1321 | |
1322 | @staticmethod | |
1323 | def status_to_string(status): | |
1324 | """Converts one of the status values that Transaction.commit() can | |
1325 | return into a human-readable string. | |
1326 | ||
1327 | (The status values are in fact such strings already, so | |
1328 | there's nothing to do.)""" | |
1329 | return status | |
1330 | ||
1331 | def __init__(self, idl): | |
1332 | """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl). | |
1333 | A given Idl may only have a single active transaction at a time. | |
1334 | ||
1335 | A Transaction may modify the contents of a database by assigning new | |
1336 | values to columns (attributes of Row), deleting rows (with | |
1337 | Row.delete()), or inserting rows (with Transaction.insert()). It may | |
1338 | also check that columns in the database have not changed with | |
1339 | Row.verify(). | |
1340 | ||
1341 | When a transaction is complete (which must be before the next call to | |
1342 | Idl.run()), call Transaction.commit() or Transaction.abort().""" | |
1343 | assert idl.txn is None | |
1344 | ||
1345 | idl.txn = self | |
1346 | self._request_id = None | |
1347 | self.idl = idl | |
1348 | self.dry_run = False | |
1349 | self._txn_rows = {} | |
1350 | self._status = Transaction.UNCOMMITTED | |
1351 | self._error = None | |
1352 | self._comments = [] | |
1353 | ||
94fbe1aa | 1354 | self._inc_row = None |
8cdf0349 | 1355 | self._inc_column = None |
8cdf0349 | 1356 | |
80c12152 SA |
1357 | self._fetch_requests = [] |
1358 | ||
26bb0f31 | 1359 | self._inserted_rows = {} # Map from UUID to _InsertedRow |
8cdf0349 BP |
1360 | |
1361 | def add_comment(self, comment): | |
80c12152 | 1362 | """Appends 'comment' to the comments that will be passed to the OVSDB |
8cdf0349 BP |
1363 | server when this transaction is committed. (The comment will be |
1364 | committed to the OVSDB log, which "ovsdb-tool show-log" can print in a | |
1365 | relatively human-readable form.)""" | |
1366 | self._comments.append(comment) | |
1367 | ||
8cdf0349 | 1368 | def wait(self, poller): |
2f926787 BP |
1369 | """Causes poll_block() to wake up if this transaction has completed |
1370 | committing.""" | |
8cdf0349 BP |
1371 | if self._status not in (Transaction.UNCOMMITTED, |
1372 | Transaction.INCOMPLETE): | |
1373 | poller.immediate_wake() | |
1374 | ||
1375 | def _substitute_uuids(self, json): | |
da2d45c6 | 1376 | if isinstance(json, (list, tuple)): |
8cdf0349 | 1377 | if (len(json) == 2 |
897c8064 LS |
1378 | and json[0] == 'uuid' |
1379 | and ovs.ovsuuid.is_valid_string(json[1])): | |
8cdf0349 BP |
1380 | uuid = ovs.ovsuuid.from_string(json[1]) |
1381 | row = self._txn_rows.get(uuid, None) | |
1382 | if row and row._data is None: | |
1383 | return ["named-uuid", _uuid_name_from_uuid(uuid)] | |
225b582a IY |
1384 | else: |
1385 | return [self._substitute_uuids(elem) for elem in json] | |
8cdf0349 BP |
1386 | return json |
1387 | ||
1388 | def __disassemble(self): | |
1389 | self.idl.txn = None | |
1390 | ||
0c4d144a | 1391 | for row in self._txn_rows.values(): |
8cdf0349 | 1392 | if row._changes is None: |
13973bc4 TW |
1393 | # If we add the deleted row back to rows with _changes == None |
1394 | # then __getattr__ will not work for the indexes | |
1395 | row.__dict__["_changes"] = {} | |
1396 | row.__dict__["_mutations"] = {} | |
8cdf0349 BP |
1397 | row._table.rows[row.uuid] = row |
1398 | elif row._data is None: | |
1399 | del row._table.rows[row.uuid] | |
1400 | row.__dict__["_changes"] = {} | |
a59912a0 | 1401 | row.__dict__["_mutations"] = {} |
8cdf0349 BP |
1402 | row.__dict__["_prereqs"] = {} |
1403 | self._txn_rows = {} | |
1404 | ||
1405 | def commit(self): | |
2f926787 BP |
1406 | """Attempts to commit 'txn'. Returns the status of the commit |
1407 | operation, one of the following constants: | |
1408 | ||
1409 | Transaction.INCOMPLETE: | |
1410 | ||
1411 | The transaction is in progress, but not yet complete. The caller | |
1412 | should call again later, after calling Idl.run() to let the | |
1413 | IDL do OVSDB protocol processing. | |
1414 | ||
1415 | Transaction.UNCHANGED: | |
1416 | ||
1417 | The transaction is complete. (It didn't actually change the | |
1418 | database, so the IDL didn't send any request to the database | |
1419 | server.) | |
1420 | ||
1421 | Transaction.ABORTED: | |
1422 | ||
1423 | The caller previously called Transaction.abort(). | |
1424 | ||
1425 | Transaction.SUCCESS: | |
1426 | ||
1427 | The transaction was successful. The update made by the | |
1428 | transaction (and possibly other changes made by other database | |
1429 | clients) should already be visible in the IDL. | |
1430 | ||
1431 | Transaction.TRY_AGAIN: | |
1432 | ||
1433 | The transaction failed for some transient reason, e.g. because a | |
1434 | "verify" operation reported an inconsistency or due to a network | |
1435 | problem. The caller should wait for a change to the database, | |
1436 | then compose a new transaction, and commit the new transaction. | |
1437 | ||
1438 | Use Idl.change_seqno to wait for a change in the database. It is | |
1439 | important to use its value *before* the initial call to | |
1440 | Transaction.commit() as the baseline for this purpose, because | |
1441 | the change that one should wait for can happen after the initial | |
1442 | call but before the call that returns Transaction.TRY_AGAIN, and | |
1443 | using some other baseline value in that situation could cause an | |
1444 | indefinite wait if the database rarely changes. | |
1445 | ||
1446 | Transaction.NOT_LOCKED: | |
1447 | ||
1448 | The transaction failed because the IDL has been configured to | |
1449 | require a database lock (with Idl.set_lock()) but didn't | |
1450 | get it yet or has already lost it. | |
8cdf0349 BP |
1451 | |
1452 | Committing a transaction rolls back all of the changes that it made to | |
2f926787 | 1453 | the IDL's copy of the database. If the transaction commits |
8cdf0349 | 1454 | successfully, then the database server will send an update and, thus, |
2f926787 | 1455 | the IDL will be updated with the committed changes.""" |
8cdf0349 BP |
1456 | # The status can only change if we're the active transaction. |
1457 | # (Otherwise, our status will change only in Idl.run().) | |
1458 | if self != self.idl.txn: | |
1459 | return self._status | |
1460 | ||
1461 | # If we need a lock but don't have it, give up quickly. | |
9614403d | 1462 | if self.idl.lock_name and not self.idl.has_lock: |
8cdf0349 BP |
1463 | self._status = Transaction.NOT_LOCKED |
1464 | self.__disassemble() | |
1465 | return self._status | |
1466 | ||
1467 | operations = [self.idl._db.name] | |
1468 | ||
1469 | # Assert that we have the required lock (avoiding a race). | |
1470 | if self.idl.lock_name: | |
1471 | operations.append({"op": "assert", | |
1472 | "lock": self.idl.lock_name}) | |
1473 | ||
1474 | # Add prerequisites and declarations of new rows. | |
0c4d144a | 1475 | for row in self._txn_rows.values(): |
8cdf0349 BP |
1476 | if row._prereqs: |
1477 | rows = {} | |
1478 | columns = [] | |
1479 | for column_name in row._prereqs: | |
1480 | columns.append(column_name) | |
1481 | rows[column_name] = row._data[column_name].to_json() | |
1482 | operations.append({"op": "wait", | |
1483 | "table": row._table.name, | |
1484 | "timeout": 0, | |
1485 | "where": _where_uuid_equals(row.uuid), | |
1486 | "until": "==", | |
1487 | "columns": columns, | |
1488 | "rows": [rows]}) | |
1489 | ||
1490 | # Add updates. | |
1491 | any_updates = False | |
0c4d144a | 1492 | for row in self._txn_rows.values(): |
8cdf0349 BP |
1493 | if row._changes is None: |
1494 | if row._table.is_root: | |
1495 | operations.append({"op": "delete", | |
1496 | "table": row._table.name, | |
1497 | "where": _where_uuid_equals(row.uuid)}) | |
1498 | any_updates = True | |
1499 | else: | |
1500 | # Let ovsdb-server decide whether to really delete it. | |
1501 | pass | |
1502 | elif row._changes: | |
1503 | op = {"table": row._table.name} | |
1504 | if row._data is None: | |
1505 | op["op"] = "insert" | |
1506 | op["uuid-name"] = _uuid_name_from_uuid(row.uuid) | |
1507 | any_updates = True | |
1508 | ||
1509 | op_index = len(operations) - 1 | |
1510 | self._inserted_rows[row.uuid] = _InsertedRow(op_index) | |
1511 | else: | |
1512 | op["op"] = "update" | |
1513 | op["where"] = _where_uuid_equals(row.uuid) | |
1514 | ||
1515 | row_json = {} | |
1516 | op["row"] = row_json | |
1517 | ||
0c4d144a | 1518 | for column_name, datum in row._changes.items(): |
8cdf0349 | 1519 | if row._data is not None or not datum.is_default(): |
26bb0f31 | 1520 | row_json[column_name] = ( |
897c8064 | 1521 | self._substitute_uuids(datum.to_json())) |
8cdf0349 BP |
1522 | |
1523 | # If anything really changed, consider it an update. | |
1524 | # We can't suppress not-really-changed values earlier | |
1525 | # or transactions would become nonatomic (see the big | |
1526 | # comment inside Transaction._write()). | |
1527 | if (not any_updates and row._data is not None and | |
897c8064 | 1528 | row._data[column_name] != datum): |
8cdf0349 BP |
1529 | any_updates = True |
1530 | ||
1531 | if row._data is None or row_json: | |
1532 | operations.append(op) | |
a59912a0 RM |
1533 | if row._mutations: |
1534 | addop = False | |
1535 | op = {"table": row._table.name} | |
1536 | op["op"] = "mutate" | |
b3220c67 AB |
1537 | if row._data is None: |
1538 | # New row | |
1539 | op["where"] = self._substitute_uuids( | |
1540 | _where_uuid_equals(row.uuid)) | |
1541 | else: | |
1542 | # Existing row | |
1543 | op["where"] = _where_uuid_equals(row.uuid) | |
a59912a0 RM |
1544 | op["mutations"] = [] |
1545 | if '_removes' in row._mutations.keys(): | |
0c4d144a | 1546 | for col, dat in row._mutations['_removes'].items(): |
a59912a0 RM |
1547 | column = row._table.columns[col] |
1548 | if column.type.is_map(): | |
1549 | opdat = ["set"] | |
330b9c9c | 1550 | opdat.append(list(dat)) |
a59912a0 RM |
1551 | else: |
1552 | opdat = ["set"] | |
1553 | inner_opdat = [] | |
1554 | for ele in dat: | |
1555 | try: | |
1556 | datum = data.Datum.from_python(column.type, | |
1557 | ele, _row_to_uuid) | |
1558 | except error.Error: | |
1559 | return | |
1560 | inner_opdat.append( | |
1561 | self._substitute_uuids(datum.to_json())) | |
1562 | opdat.append(inner_opdat) | |
1563 | mutation = [col, "delete", opdat] | |
1564 | op["mutations"].append(mutation) | |
1565 | addop = True | |
1566 | if '_inserts' in row._mutations.keys(): | |
0c4d144a | 1567 | for col, val in row._mutations['_inserts'].items(): |
a59912a0 RM |
1568 | column = row._table.columns[col] |
1569 | if column.type.is_map(): | |
330b9c9c AB |
1570 | datum = data.Datum.from_python(column.type, val, |
1571 | _row_to_uuid) | |
9435b0b8 | 1572 | opdat = self._substitute_uuids(datum.to_json()) |
a59912a0 RM |
1573 | else: |
1574 | opdat = ["set"] | |
1575 | inner_opdat = [] | |
330b9c9c | 1576 | for ele in val: |
a59912a0 RM |
1577 | try: |
1578 | datum = data.Datum.from_python(column.type, | |
1579 | ele, _row_to_uuid) | |
1580 | except error.Error: | |
1581 | return | |
1582 | inner_opdat.append( | |
1583 | self._substitute_uuids(datum.to_json())) | |
1584 | opdat.append(inner_opdat) | |
1585 | mutation = [col, "insert", opdat] | |
1586 | op["mutations"].append(mutation) | |
1587 | addop = True | |
1588 | if addop: | |
1589 | operations.append(op) | |
1590 | any_updates = True | |
8cdf0349 | 1591 | |
80c12152 SA |
1592 | if self._fetch_requests: |
1593 | for fetch in self._fetch_requests: | |
1594 | fetch["index"] = len(operations) - 1 | |
1595 | operations.append({"op": "select", | |
1596 | "table": fetch["row"]._table.name, | |
1597 | "where": self._substitute_uuids( | |
1598 | _where_uuid_equals(fetch["row"].uuid)), | |
1599 | "columns": [fetch["column_name"]]}) | |
1600 | any_updates = True | |
1601 | ||
8cdf0349 | 1602 | # Add increment. |
94fbe1aa | 1603 | if self._inc_row and any_updates: |
8cdf0349 BP |
1604 | self._inc_index = len(operations) - 1 |
1605 | ||
1606 | operations.append({"op": "mutate", | |
94fbe1aa | 1607 | "table": self._inc_row._table.name, |
26bb0f31 | 1608 | "where": self._substitute_uuids( |
94fbe1aa | 1609 | _where_uuid_equals(self._inc_row.uuid)), |
8cdf0349 BP |
1610 | "mutations": [[self._inc_column, "+=", 1]]}) |
1611 | operations.append({"op": "select", | |
94fbe1aa | 1612 | "table": self._inc_row._table.name, |
26bb0f31 | 1613 | "where": self._substitute_uuids( |
94fbe1aa | 1614 | _where_uuid_equals(self._inc_row.uuid)), |
8cdf0349 BP |
1615 | "columns": [self._inc_column]}) |
1616 | ||
1617 | # Add comment. | |
1618 | if self._comments: | |
1619 | operations.append({"op": "comment", | |
1620 | "comment": "\n".join(self._comments)}) | |
1621 | ||
1622 | # Dry run? | |
1623 | if self.dry_run: | |
1624 | operations.append({"op": "abort"}) | |
1625 | ||
1626 | if not any_updates: | |
1627 | self._status = Transaction.UNCHANGED | |
1628 | else: | |
1629 | msg = ovs.jsonrpc.Message.create_request("transact", operations) | |
1630 | self._request_id = msg.id | |
1631 | if not self.idl._session.send(msg): | |
1632 | self.idl._outstanding_txns[self._request_id] = self | |
1633 | self._status = Transaction.INCOMPLETE | |
1634 | else: | |
854a94d9 | 1635 | self._status = Transaction.TRY_AGAIN |
8cdf0349 BP |
1636 | |
1637 | self.__disassemble() | |
1638 | return self._status | |
1639 | ||
1640 | def commit_block(self): | |
2f926787 BP |
1641 | """Attempts to commit this transaction, blocking until the commit |
1642 | either succeeds or fails. Returns the final commit status, which may | |
1643 | be any Transaction.* value other than Transaction.INCOMPLETE. | |
1644 | ||
1645 | This function calls Idl.run() on this transaction'ss IDL, so it may | |
1646 | cause Idl.change_seqno to change.""" | |
8cdf0349 BP |
1647 | while True: |
1648 | status = self.commit() | |
1649 | if status != Transaction.INCOMPLETE: | |
1650 | return status | |
1651 | ||
1652 | self.idl.run() | |
1653 | ||
1654 | poller = ovs.poller.Poller() | |
1655 | self.idl.wait(poller) | |
1656 | self.wait(poller) | |
1657 | poller.block() | |
1658 | ||
1659 | def get_increment_new_value(self): | |
2f926787 BP |
1660 | """Returns the final (incremented) value of the column in this |
1661 | transaction that was set to be incremented by Row.increment. This | |
1662 | transaction must have committed successfully.""" | |
8cdf0349 BP |
1663 | assert self._status == Transaction.SUCCESS |
1664 | return self._inc_new_value | |
1665 | ||
1666 | def abort(self): | |
1667 | """Aborts this transaction. If Transaction.commit() has already been | |
1668 | called then the transaction might get committed anyhow.""" | |
1669 | self.__disassemble() | |
1670 | if self._status in (Transaction.UNCOMMITTED, | |
1671 | Transaction.INCOMPLETE): | |
1672 | self._status = Transaction.ABORTED | |
1673 | ||
1674 | def get_error(self): | |
1675 | """Returns a string representing this transaction's current status, | |
1676 | suitable for use in log messages.""" | |
1677 | if self._status != Transaction.ERROR: | |
1678 | return Transaction.status_to_string(self._status) | |
1679 | elif self._error: | |
1680 | return self._error | |
1681 | else: | |
1682 | return "no error details available" | |
1683 | ||
1684 | def __set_error_json(self, json): | |
1685 | if self._error is None: | |
1686 | self._error = ovs.json.to_string(json) | |
1687 | ||
1688 | def get_insert_uuid(self, uuid): | |
1689 | """Finds and returns the permanent UUID that the database assigned to a | |
1690 | newly inserted row, given the UUID that Transaction.insert() assigned | |
1691 | locally to that row. | |
1692 | ||
1693 | Returns None if 'uuid' is not a UUID assigned by Transaction.insert() | |
1694 | or if it was assigned by that function and then deleted by Row.delete() | |
1695 | within the same transaction. (Rows that are inserted and then deleted | |
1696 | within a single transaction are never sent to the database server, so | |
1697 | it never assigns them a permanent UUID.) | |
1698 | ||
1699 | This transaction must have completed successfully.""" | |
1700 | assert self._status in (Transaction.SUCCESS, | |
1701 | Transaction.UNCHANGED) | |
1702 | inserted_row = self._inserted_rows.get(uuid) | |
1703 | if inserted_row: | |
1704 | return inserted_row.real | |
1705 | return None | |
1706 | ||
94fbe1aa BP |
1707 | def _increment(self, row, column): |
1708 | assert not self._inc_row | |
1709 | self._inc_row = row | |
1710 | self._inc_column = column | |
1711 | ||
80c12152 | 1712 | def _fetch(self, row, column_name): |
a0631d92 | 1713 | self._fetch_requests.append({"row": row, "column_name": column_name}) |
80c12152 | 1714 | |
8cdf0349 BP |
1715 | def _write(self, row, column, datum): |
1716 | assert row._changes is not None | |
a59912a0 | 1717 | assert row._mutations is not None |
8cdf0349 BP |
1718 | |
1719 | txn = row._idl.txn | |
1720 | ||
1721 | # If this is a write-only column and the datum being written is the | |
1722 | # same as the one already there, just skip the update entirely. This | |
1723 | # is worth optimizing because we have a lot of columns that get | |
1724 | # periodically refreshed into the database but don't actually change | |
1725 | # that often. | |
1726 | # | |
1727 | # We don't do this for read/write columns because that would break | |
1728 | # atomicity of transactions--some other client might have written a | |
1729 | # different value in that column since we read it. (But if a whole | |
1730 | # transaction only does writes of existing values, without making any | |
1731 | # real changes, we will drop the whole transaction later in | |
1732 | # ovsdb_idl_txn_commit().) | |
37520ab3 RB |
1733 | if (not column.alert and row._data and |
1734 | row._data.get(column.name) == datum): | |
8cdf0349 BP |
1735 | new_value = row._changes.get(column.name) |
1736 | if new_value is None or new_value == datum: | |
1737 | return | |
1738 | ||
1739 | txn._txn_rows[row.uuid] = row | |
330b9c9c AB |
1740 | if '_inserts' in row._mutations: |
1741 | row._mutations['_inserts'].pop(column.name, None) | |
1742 | if '_removes' in row._mutations: | |
1743 | row._mutations['_removes'].pop(column.name, None) | |
1744 | row._changes[column.name] = datum.copy() | |
8cdf0349 BP |
1745 | |
1746 | def insert(self, table, new_uuid=None): | |
1747 | """Inserts and returns a new row in 'table', which must be one of the | |
1748 | ovs.db.schema.TableSchema objects in the Idl's 'tables' dict. | |
1749 | ||
1750 | The new row is assigned a provisional UUID. If 'uuid' is None then one | |
1751 | is randomly generated; otherwise 'uuid' should specify a randomly | |
1752 | generated uuid.UUID not otherwise in use. ovsdb-server will assign a | |
1753 | different UUID when 'txn' is committed, but the IDL will replace any | |
1754 | uses of the provisional UUID in the data to be to be committed by the | |
1755 | UUID assigned by ovsdb-server.""" | |
1756 | assert self._status == Transaction.UNCOMMITTED | |
1757 | if new_uuid is None: | |
1758 | new_uuid = uuid.uuid4() | |
1759 | row = Row(self.idl, table, new_uuid, None) | |
1760 | table.rows[row.uuid] = row | |
1761 | self._txn_rows[row.uuid] = row | |
1762 | return row | |
1763 | ||
1764 | def _process_reply(self, msg): | |
1765 | if msg.type == ovs.jsonrpc.Message.T_ERROR: | |
1766 | self._status = Transaction.ERROR | |
da2d45c6 | 1767 | elif not isinstance(msg.result, (list, tuple)): |
8cdf0349 | 1768 | # XXX rate-limit |
3a656eaf | 1769 | vlog.warn('reply to "transact" is not JSON array') |
8cdf0349 BP |
1770 | else: |
1771 | hard_errors = False | |
1772 | soft_errors = False | |
1773 | lock_errors = False | |
1774 | ||
1775 | ops = msg.result | |
1776 | for op in ops: | |
1777 | if op is None: | |
1778 | # This isn't an error in itself but indicates that some | |
1779 | # prior operation failed, so make sure that we know about | |
1780 | # it. | |
1781 | soft_errors = True | |
da2d45c6 | 1782 | elif isinstance(op, dict): |
8cdf0349 BP |
1783 | error = op.get("error") |
1784 | if error is not None: | |
1785 | if error == "timed out": | |
1786 | soft_errors = True | |
1787 | elif error == "not owner": | |
1788 | lock_errors = True | |
1789 | elif error == "aborted": | |
1790 | pass | |
1791 | else: | |
1792 | hard_errors = True | |
1793 | self.__set_error_json(op) | |
1794 | else: | |
1795 | hard_errors = True | |
1796 | self.__set_error_json(op) | |
1797 | # XXX rate-limit | |
3a656eaf | 1798 | vlog.warn("operation reply is not JSON null or object") |
8cdf0349 BP |
1799 | |
1800 | if not soft_errors and not hard_errors and not lock_errors: | |
94fbe1aa | 1801 | if self._inc_row and not self.__process_inc_reply(ops): |
8cdf0349 | 1802 | hard_errors = True |
80c12152 SA |
1803 | if self._fetch_requests: |
1804 | if self.__process_fetch_reply(ops): | |
1805 | self.idl.change_seqno += 1 | |
1806 | else: | |
1807 | hard_errors = True | |
8cdf0349 | 1808 | |
0c4d144a | 1809 | for insert in self._inserted_rows.values(): |
8cdf0349 BP |
1810 | if not self.__process_insert_reply(insert, ops): |
1811 | hard_errors = True | |
1812 | ||
1813 | if hard_errors: | |
1814 | self._status = Transaction.ERROR | |
1815 | elif lock_errors: | |
1816 | self._status = Transaction.NOT_LOCKED | |
1817 | elif soft_errors: | |
854a94d9 | 1818 | self._status = Transaction.TRY_AGAIN |
8cdf0349 BP |
1819 | else: |
1820 | self._status = Transaction.SUCCESS | |
1821 | ||
1822 | @staticmethod | |
1823 | def __check_json_type(json, types, name): | |
1824 | if not json: | |
1825 | # XXX rate-limit | |
3a656eaf | 1826 | vlog.warn("%s is missing" % name) |
8cdf0349 | 1827 | return False |
da2d45c6 | 1828 | elif not isinstance(json, tuple(types)): |
8cdf0349 | 1829 | # XXX rate-limit |
3a656eaf | 1830 | vlog.warn("%s has unexpected type %s" % (name, type(json))) |
8cdf0349 BP |
1831 | return False |
1832 | else: | |
1833 | return True | |
1834 | ||
80c12152 SA |
1835 | def __process_fetch_reply(self, ops): |
1836 | update = False | |
1837 | for fetch_request in self._fetch_requests: | |
1838 | row = fetch_request["row"] | |
1839 | column_name = fetch_request["column_name"] | |
1840 | index = fetch_request["index"] | |
1841 | table = row._table | |
1842 | ||
1843 | select = ops[index] | |
1844 | fetched_rows = select.get("rows") | |
1845 | if not Transaction.__check_json_type(fetched_rows, (list, tuple), | |
1846 | '"select" reply "rows"'): | |
1847 | return False | |
1848 | if len(fetched_rows) != 1: | |
1849 | # XXX rate-limit | |
1850 | vlog.warn('"select" reply "rows" has %d elements ' | |
e8049bc5 | 1851 | 'instead of 1' % len(fetched_rows)) |
80c12152 SA |
1852 | continue |
1853 | fetched_row = fetched_rows[0] | |
1854 | if not Transaction.__check_json_type(fetched_row, (dict,), | |
1855 | '"select" reply row'): | |
1856 | continue | |
1857 | ||
1858 | column = table.columns.get(column_name) | |
1859 | datum_json = fetched_row.get(column_name) | |
a59912a0 | 1860 | datum = data.Datum.from_json(column.type, datum_json) |
80c12152 SA |
1861 | |
1862 | row._data[column_name] = datum | |
1863 | update = True | |
1864 | ||
1865 | return update | |
1866 | ||
8cdf0349 BP |
1867 | def __process_inc_reply(self, ops): |
1868 | if self._inc_index + 2 > len(ops): | |
1869 | # XXX rate-limit | |
3a656eaf EJ |
1870 | vlog.warn("reply does not contain enough operations for " |
1871 | "increment (has %d, needs %d)" % | |
1872 | (len(ops), self._inc_index + 2)) | |
8cdf0349 BP |
1873 | |
1874 | # We know that this is a JSON object because the loop in | |
1875 | # __process_reply() already checked. | |
1876 | mutate = ops[self._inc_index] | |
1877 | count = mutate.get("count") | |
0c4d144a | 1878 | if not Transaction.__check_json_type(count, (int,), |
8cdf0349 BP |
1879 | '"mutate" reply "count"'): |
1880 | return False | |
1881 | if count != 1: | |
1882 | # XXX rate-limit | |
3a656eaf | 1883 | vlog.warn('"mutate" reply "count" is %d instead of 1' % count) |
8cdf0349 BP |
1884 | return False |
1885 | ||
1886 | select = ops[self._inc_index + 1] | |
1887 | rows = select.get("rows") | |
1888 | if not Transaction.__check_json_type(rows, (list, tuple), | |
1889 | '"select" reply "rows"'): | |
1890 | return False | |
1891 | if len(rows) != 1: | |
1892 | # XXX rate-limit | |
3a656eaf EJ |
1893 | vlog.warn('"select" reply "rows" has %d elements ' |
1894 | 'instead of 1' % len(rows)) | |
8cdf0349 BP |
1895 | return False |
1896 | row = rows[0] | |
1897 | if not Transaction.__check_json_type(row, (dict,), | |
1898 | '"select" reply row'): | |
1899 | return False | |
1900 | column = row.get(self._inc_column) | |
0c4d144a | 1901 | if not Transaction.__check_json_type(column, (int,), |
8cdf0349 BP |
1902 | '"select" reply inc column'): |
1903 | return False | |
1904 | self._inc_new_value = column | |
1905 | return True | |
1906 | ||
1907 | def __process_insert_reply(self, insert, ops): | |
1908 | if insert.op_index >= len(ops): | |
1909 | # XXX rate-limit | |
3a656eaf EJ |
1910 | vlog.warn("reply does not contain enough operations " |
1911 | "for insert (has %d, needs %d)" | |
1912 | % (len(ops), insert.op_index)) | |
8cdf0349 BP |
1913 | return False |
1914 | ||
1915 | # We know that this is a JSON object because the loop in | |
1916 | # __process_reply() already checked. | |
1917 | reply = ops[insert.op_index] | |
1918 | json_uuid = reply.get("uuid") | |
1919 | if not Transaction.__check_json_type(json_uuid, (tuple, list), | |
1920 | '"insert" reply "uuid"'): | |
1921 | return False | |
1922 | ||
1923 | try: | |
1924 | uuid_ = ovs.ovsuuid.from_json(json_uuid) | |
1925 | except error.Error: | |
1926 | # XXX rate-limit | |
3a656eaf | 1927 | vlog.warn('"insert" reply "uuid" is not a JSON UUID') |
8cdf0349 | 1928 | return False |
bf6ec045 | 1929 | |
8cdf0349 BP |
1930 | insert.real = uuid_ |
1931 | return True | |
ad0991e6 EJ |
1932 | |
1933 | ||
1934 | class SchemaHelper(object): | |
1935 | """IDL Schema helper. | |
1936 | ||
1937 | This class encapsulates the logic required to generate schemas suitable | |
1938 | for creating 'ovs.db.idl.Idl' objects. Clients should register columns | |
1939 | they are interested in using register_columns(). When finished, the | |
1940 | get_idl_schema() function may be called. | |
1941 | ||
1942 | The location on disk of the schema used may be found in the | |
1943 | 'schema_location' variable.""" | |
1944 | ||
e15ad8e6 IY |
1945 | def __init__(self, location=None, schema_json=None): |
1946 | """Creates a new Schema object. | |
ad0991e6 | 1947 | |
e15ad8e6 IY |
1948 | 'location' file path to ovs schema. None means default location |
1949 | 'schema_json' schema in json preresentation in memory | |
1950 | """ | |
1951 | ||
1952 | if location and schema_json: | |
1953 | raise ValueError("both location and schema_json can't be " | |
1954 | "specified. it's ambiguous.") | |
1955 | if schema_json is None: | |
1956 | if location is None: | |
1957 | location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR | |
1958 | schema_json = ovs.json.from_file(location) | |
ad0991e6 | 1959 | |
e15ad8e6 | 1960 | self.schema_json = schema_json |
ad0991e6 | 1961 | self._tables = {} |
80c12152 | 1962 | self._readonly = {} |
bf42f674 | 1963 | self._all = False |
ad0991e6 | 1964 | |
80c12152 | 1965 | def register_columns(self, table, columns, readonly=[]): |
ad0991e6 EJ |
1966 | """Registers interest in the given 'columns' of 'table'. Future calls |
1967 | to get_idl_schema() will include 'table':column for each column in | |
1968 | 'columns'. This function automatically avoids adding duplicate entries | |
1969 | to the schema. | |
80c12152 SA |
1970 | A subset of 'columns' can be specified as 'readonly'. The readonly |
1971 | columns are not replicated but can be fetched on-demand by the user | |
1972 | with Row.fetch(). | |
ad0991e6 EJ |
1973 | |
1974 | 'table' must be a string. | |
1975 | 'columns' must be a list of strings. | |
80c12152 | 1976 | 'readonly' must be a list of strings. |
ad0991e6 EJ |
1977 | """ |
1978 | ||
0c4d144a | 1979 | assert isinstance(table, str) |
da2d45c6 | 1980 | assert isinstance(columns, list) |
ad0991e6 EJ |
1981 | |
1982 | columns = set(columns) | self._tables.get(table, set()) | |
1983 | self._tables[table] = columns | |
80c12152 | 1984 | self._readonly[table] = readonly |
ad0991e6 | 1985 | |
7698e31d IY |
1986 | def register_table(self, table): |
1987 | """Registers interest in the given all columns of 'table'. Future calls | |
1988 | to get_idl_schema() will include all columns of 'table'. | |
1989 | ||
1990 | 'table' must be a string | |
1991 | """ | |
0c4d144a | 1992 | assert isinstance(table, str) |
7698e31d IY |
1993 | self._tables[table] = set() # empty set means all columns in the table |
1994 | ||
bf42f674 EJ |
1995 | def register_all(self): |
1996 | """Registers interest in every column of every table.""" | |
1997 | self._all = True | |
1998 | ||
ad0991e6 EJ |
1999 | def get_idl_schema(self): |
2000 | """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL' | |
2001 | object based on columns registered using the register_columns() | |
2002 | function.""" | |
2003 | ||
e15ad8e6 IY |
2004 | schema = ovs.db.schema.DbSchema.from_json(self.schema_json) |
2005 | self.schema_json = None | |
ad0991e6 | 2006 | |
bf42f674 EJ |
2007 | if not self._all: |
2008 | schema_tables = {} | |
0c4d144a | 2009 | for table, columns in self._tables.items(): |
bf42f674 EJ |
2010 | schema_tables[table] = ( |
2011 | self._keep_table_columns(schema, table, columns)) | |
2012 | ||
2013 | schema.tables = schema_tables | |
80c12152 | 2014 | schema.readonly = self._readonly |
ad0991e6 EJ |
2015 | return schema |
2016 | ||
2017 | def _keep_table_columns(self, schema, table_name, columns): | |
2018 | assert table_name in schema.tables | |
2019 | table = schema.tables[table_name] | |
2020 | ||
7698e31d IY |
2021 | if not columns: |
2022 | # empty set means all columns in the table | |
2023 | return table | |
2024 | ||
ad0991e6 EJ |
2025 | new_columns = {} |
2026 | for column_name in columns: | |
0c4d144a | 2027 | assert isinstance(column_name, str) |
ad0991e6 EJ |
2028 | assert column_name in table.columns |
2029 | ||
2030 | new_columns[column_name] = table.columns[column_name] | |
2031 | ||
2032 | table.columns = new_columns | |
2033 | return table |