]>
Commit | Line | Data |
---|---|---|
e0edde6f | 1 | # Copyright (c) 2009, 2010, 2011, 2012 Nicira, Inc. |
99155935 BP |
2 | # |
3 | # Licensed under the Apache License, Version 2.0 (the "License"); | |
4 | # you may not use this file except in compliance with the License. | |
5 | # You may obtain a copy of the License at: | |
6 | # | |
7 | # http://www.apache.org/licenses/LICENSE-2.0 | |
8 | # | |
9 | # Unless required by applicable law or agreed to in writing, software | |
10 | # distributed under the License is distributed on an "AS IS" BASIS, | |
11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 | # See the License for the specific language governing permissions and | |
13 | # limitations under the License. | |
14 | ||
8cdf0349 | 15 | import uuid |
99155935 BP |
16 | |
17 | import ovs.jsonrpc | |
4c0f6271 | 18 | import ovs.db.parser |
99155935 BP |
19 | import ovs.db.schema |
20 | from ovs.db import error | |
21 | import ovs.ovsuuid | |
8cdf0349 | 22 | import ovs.poller |
3a656eaf EJ |
23 | import ovs.vlog |
24 | ||
25 | vlog = ovs.vlog.Vlog("idl") | |
99155935 | 26 | |
26bb0f31 EJ |
27 | __pychecker__ = 'no-classattr no-objattrs' |
28 | ||
29 | ||
99155935 BP |
30 | class Idl: |
31 | """Open vSwitch Database Interface Definition Language (OVSDB IDL). | |
32 | ||
33 | The OVSDB IDL maintains an in-memory replica of a database. It issues RPC | |
34 | requests to an OVSDB database server and parses the responses, converting | |
35 | raw JSON into data structures that are easier for clients to digest. | |
36 | ||
37 | The IDL also assists with issuing database transactions. The client | |
38 | creates a transaction, manipulates the IDL data structures, and commits or | |
39 | aborts the transaction. The IDL then composes and issues the necessary | |
40 | JSON-RPC requests and reports to the client whether the transaction | |
41 | completed successfully. | |
42 | ||
8cdf0349 BP |
43 | The client is allowed to access the following attributes directly, in a |
44 | read-only fashion: | |
99155935 | 45 | |
8cdf0349 BP |
46 | - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided |
47 | to the Idl constructor. Each ovs.db.schema.TableSchema in the map is | |
48 | annotated with a new attribute 'rows', which is a dict from a uuid.UUID | |
49 | to a Row object. | |
50 | ||
51 | The client may directly read and write the Row objects referenced by the | |
52 | 'rows' map values. Refer to Row for more details. | |
53 | ||
54 | - 'change_seqno': A number that represents the IDL's state. When the IDL | |
2f926787 BP |
55 | is updated (by Idl.run()), its value changes. The sequence number can |
56 | occasionally change even if the database does not. This happens if the | |
57 | connection to the database drops and reconnects, which causes the | |
58 | database contents to be reloaded even if they didn't change. (It could | |
59 | also happen if the database server sends out a "change" that reflects | |
60 | what the IDL already thought was in the database. The database server is | |
61 | not supposed to do that, but bugs could in theory cause it to do so.) | |
8cdf0349 BP |
62 | |
63 | - 'lock_name': The name of the lock configured with Idl.set_lock(), or None | |
64 | if no lock is configured. | |
65 | ||
66 | - 'has_lock': True, if the IDL is configured to obtain a lock and owns that | |
67 | lock, and False otherwise. | |
68 | ||
69 | Locking and unlocking happens asynchronously from the database client's | |
70 | point of view, so the information is only useful for optimization | |
71 | (e.g. if the client doesn't have the lock then there's no point in trying | |
72 | to write to the database). | |
73 | ||
74 | - 'is_lock_contended': True, if the IDL is configured to obtain a lock but | |
75 | the database server has indicated that some other client already owns the | |
76 | requested lock, and False otherwise. | |
77 | ||
78 | - 'txn': The ovs.db.idl.Transaction object for the database transaction | |
79 | currently being constructed, if there is one, or None otherwise. | |
80 | """ | |
81 | ||
82 | def __init__(self, remote, schema): | |
99155935 BP |
83 | """Creates and returns a connection to the database named 'db_name' on |
84 | 'remote', which should be in a form acceptable to | |
85 | ovs.jsonrpc.session.open(). The connection will maintain an in-memory | |
8cdf0349 BP |
86 | replica of the remote database. |
87 | ||
88 | 'schema' should be the schema for the remote database. The caller may | |
89 | have cut it down by removing tables or columns that are not of | |
90 | interest. The IDL will only replicate the tables and columns that | |
91 | remain. The caller may also add a attribute named 'alert' to selected | |
92 | remaining columns, setting its value to False; if so, then changes to | |
93 | those columns will not be considered changes to the database for the | |
94 | purpose of the return value of Idl.run() and Idl.change_seqno. This is | |
95 | useful for columns that the IDL's client will write but not read. | |
96 | ||
ad0991e6 EJ |
97 | As a convenience to users, 'schema' may also be an instance of the |
98 | SchemaHelper class. | |
99 | ||
8cdf0349 BP |
100 | The IDL uses and modifies 'schema' directly.""" |
101 | ||
bf42f674 EJ |
102 | assert isinstance(schema, SchemaHelper) |
103 | schema = schema.get_idl_schema() | |
ad0991e6 | 104 | |
8cdf0349 BP |
105 | self.tables = schema.tables |
106 | self._db = schema | |
107 | self._session = ovs.jsonrpc.Session.open(remote) | |
108 | self._monitor_request_id = None | |
109 | self._last_seqno = None | |
99155935 | 110 | self.change_seqno = 0 |
8cdf0349 BP |
111 | |
112 | # Database locking. | |
113 | self.lock_name = None # Name of lock we need, None if none. | |
114 | self.has_lock = False # Has db server said we have the lock? | |
26bb0f31 | 115 | self.is_lock_contended = False # Has db server said we can't get lock? |
8cdf0349 BP |
116 | self._lock_request_id = None # JSON-RPC ID of in-flight lock request. |
117 | ||
118 | # Transaction support. | |
119 | self.txn = None | |
120 | self._outstanding_txns = {} | |
121 | ||
122 | for table in schema.tables.itervalues(): | |
123 | for column in table.columns.itervalues(): | |
124 | if not hasattr(column, 'alert'): | |
125 | column.alert = True | |
126 | table.need_table = False | |
127 | table.rows = {} | |
128 | table.idl = self | |
99155935 BP |
129 | |
130 | def close(self): | |
8cdf0349 BP |
131 | """Closes the connection to the database. The IDL will no longer |
132 | update.""" | |
133 | self._session.close() | |
99155935 BP |
134 | |
135 | def run(self): | |
136 | """Processes a batch of messages from the database server. Returns | |
137 | True if the database as seen through the IDL changed, False if it did | |
138 | not change. The initial fetch of the entire contents of the remote | |
8cdf0349 BP |
139 | database is considered to be one kind of change. If the IDL has been |
140 | configured to acquire a database lock (with Idl.set_lock()), then | |
141 | successfully acquiring the lock is also considered to be a change. | |
99155935 BP |
142 | |
143 | This function can return occasional false positives, that is, report | |
144 | that the database changed even though it didn't. This happens if the | |
145 | connection to the database drops and reconnects, which causes the | |
146 | database contents to be reloaded even if they didn't change. (It could | |
147 | also happen if the database server sends out a "change" that reflects | |
148 | what we already thought was in the database, but the database server is | |
149 | not supposed to do that.) | |
150 | ||
151 | As an alternative to checking the return value, the client may check | |
8cdf0349 BP |
152 | for changes in self.change_seqno.""" |
153 | assert not self.txn | |
99155935 | 154 | initial_change_seqno = self.change_seqno |
8cdf0349 BP |
155 | self._session.run() |
156 | i = 0 | |
157 | while i < 50: | |
158 | i += 1 | |
159 | if not self._session.is_connected(): | |
160 | break | |
161 | ||
162 | seqno = self._session.get_seqno() | |
163 | if seqno != self._last_seqno: | |
164 | self._last_seqno = seqno | |
165 | self.__txn_abort_all() | |
166 | self.__send_monitor_request() | |
167 | if self.lock_name: | |
168 | self.__send_lock_request() | |
169 | break | |
170 | ||
171 | msg = self._session.recv() | |
172 | if msg is None: | |
173 | break | |
174 | if (msg.type == ovs.jsonrpc.Message.T_NOTIFY | |
175 | and msg.method == "update" | |
176 | and len(msg.params) == 2 | |
177 | and msg.params[0] == None): | |
178 | # Database contents changed. | |
179 | self.__parse_update(msg.params[1]) | |
180 | elif (msg.type == ovs.jsonrpc.Message.T_REPLY | |
181 | and self._monitor_request_id is not None | |
182 | and self._monitor_request_id == msg.id): | |
183 | # Reply to our "monitor" request. | |
184 | try: | |
185 | self.change_seqno += 1 | |
186 | self._monitor_request_id = None | |
187 | self.__clear() | |
188 | self.__parse_update(msg.result) | |
189 | except error.Error, e: | |
3a656eaf EJ |
190 | vlog.err("%s: parse error in received schema: %s" |
191 | % (self._session.get_name(), e)) | |
8cdf0349 BP |
192 | self.__error() |
193 | elif (msg.type == ovs.jsonrpc.Message.T_REPLY | |
194 | and self._lock_request_id is not None | |
195 | and self._lock_request_id == msg.id): | |
196 | # Reply to our "lock" request. | |
197 | self.__parse_lock_reply(msg.result) | |
198 | elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY | |
199 | and msg.method == "locked"): | |
200 | # We got our lock. | |
201 | self.__parse_lock_notify(msg.params, True) | |
202 | elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY | |
203 | and msg.method == "stolen"): | |
204 | # Someone else stole our lock. | |
205 | self.__parse_lock_notify(msg.params, False) | |
206 | elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo": | |
207 | # Reply to our echo request. Ignore it. | |
208 | pass | |
209 | elif (msg.type in (ovs.jsonrpc.Message.T_ERROR, | |
210 | ovs.jsonrpc.Message.T_REPLY) | |
211 | and self.__txn_process_reply(msg)): | |
212 | # __txn_process_reply() did everything needed. | |
213 | pass | |
214 | else: | |
215 | # This can happen if a transaction is destroyed before we | |
216 | # receive the reply, so keep the log level low. | |
3a656eaf EJ |
217 | vlog.dbg("%s: received unexpected %s message" |
218 | % (self._session.get_name(), | |
219 | ovs.jsonrpc.Message.type_to_string(msg.type))) | |
8cdf0349 | 220 | |
99155935 BP |
221 | return initial_change_seqno != self.change_seqno |
222 | ||
223 | def wait(self, poller): | |
224 | """Arranges for poller.block() to wake up when self.run() has something | |
225 | to do or when activity occurs on a transaction on 'self'.""" | |
8cdf0349 BP |
226 | self._session.wait(poller) |
227 | self._session.recv_wait(poller) | |
99155935 | 228 | |
8cdf0349 BP |
229 | def has_ever_connected(self): |
230 | """Returns True, if the IDL successfully connected to the remote | |
231 | database and retrieved its contents (even if the connection | |
232 | subsequently dropped and is in the process of reconnecting). If so, | |
233 | then the IDL contains an atomic snapshot of the database's contents | |
234 | (but it might be arbitrarily old if the connection dropped). | |
235 | ||
236 | Returns False if the IDL has never connected or retrieved the | |
237 | database's contents. If so, the IDL is empty.""" | |
238 | return self.change_seqno != 0 | |
239 | ||
240 | def force_reconnect(self): | |
241 | """Forces the IDL to drop its connection to the database and reconnect. | |
242 | In the meantime, the contents of the IDL will not change.""" | |
243 | self._session.force_reconnect() | |
244 | ||
245 | def set_lock(self, lock_name): | |
246 | """If 'lock_name' is not None, configures the IDL to obtain the named | |
247 | lock from the database server and to avoid modifying the database when | |
248 | the lock cannot be acquired (that is, when another client has the same | |
249 | lock). | |
250 | ||
251 | If 'lock_name' is None, drops the locking requirement and releases the | |
252 | lock.""" | |
253 | assert not self.txn | |
254 | assert not self._outstanding_txns | |
255 | ||
256 | if self.lock_name and (not lock_name or lock_name != self.lock_name): | |
257 | # Release previous lock. | |
258 | self.__send_unlock_request() | |
259 | self.lock_name = None | |
260 | self.is_lock_contended = False | |
261 | ||
262 | if lock_name and not self.lock_name: | |
263 | # Acquire new lock. | |
264 | self.lock_name = lock_name | |
265 | self.__send_lock_request() | |
266 | ||
267 | def __clear(self): | |
268 | changed = False | |
269 | ||
270 | for table in self.tables.itervalues(): | |
271 | if table.rows: | |
272 | changed = True | |
273 | table.rows = {} | |
99155935 | 274 | |
8cdf0349 BP |
275 | if changed: |
276 | self.change_seqno += 1 | |
277 | ||
278 | def __update_has_lock(self, new_has_lock): | |
279 | if new_has_lock and not self.has_lock: | |
280 | if self._monitor_request_id is None: | |
281 | self.change_seqno += 1 | |
282 | else: | |
283 | # We're waiting for a monitor reply, so don't signal that the | |
284 | # database changed. The monitor reply will increment | |
285 | # change_seqno anyhow. | |
286 | pass | |
287 | self.is_lock_contended = False | |
288 | self.has_lock = new_has_lock | |
289 | ||
290 | def __do_send_lock_request(self, method): | |
291 | self.__update_has_lock(False) | |
292 | self._lock_request_id = None | |
293 | if self._session.is_connected(): | |
294 | msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name]) | |
295 | msg_id = msg.id | |
296 | self._session.send(msg) | |
297 | else: | |
298 | msg_id = None | |
299 | return msg_id | |
300 | ||
301 | def __send_lock_request(self): | |
302 | self._lock_request_id = self.__do_send_lock_request("lock") | |
303 | ||
304 | def __send_unlock_request(self): | |
305 | self.__do_send_lock_request("unlock") | |
306 | ||
307 | def __parse_lock_reply(self, result): | |
308 | self._lock_request_id = None | |
309 | got_lock = type(result) == dict and result.get("locked") is True | |
310 | self.__update_has_lock(got_lock) | |
311 | if not got_lock: | |
312 | self.is_lock_contended = True | |
313 | ||
314 | def __parse_lock_notify(self, params, new_has_lock): | |
315 | if (self.lock_name is not None | |
316 | and type(params) in (list, tuple) | |
317 | and params | |
318 | and params[0] == self.lock_name): | |
319 | self.__update_has_lock(self, new_has_lock) | |
320 | if not new_has_lock: | |
321 | self.is_lock_contended = True | |
99155935 BP |
322 | |
323 | def __send_monitor_request(self): | |
324 | monitor_requests = {} | |
8cdf0349 | 325 | for table in self.tables.itervalues(): |
99155935 BP |
326 | monitor_requests[table.name] = {"columns": table.columns.keys()} |
327 | msg = ovs.jsonrpc.Message.create_request( | |
8cdf0349 BP |
328 | "monitor", [self._db.name, None, monitor_requests]) |
329 | self._monitor_request_id = msg.id | |
330 | self._session.send(msg) | |
99155935 BP |
331 | |
332 | def __parse_update(self, update): | |
333 | try: | |
334 | self.__do_parse_update(update) | |
335 | except error.Error, e: | |
3a656eaf EJ |
336 | vlog.err("%s: error parsing update: %s" |
337 | % (self._session.get_name(), e)) | |
99155935 BP |
338 | |
339 | def __do_parse_update(self, table_updates): | |
340 | if type(table_updates) != dict: | |
341 | raise error.Error("<table-updates> is not an object", | |
342 | table_updates) | |
343 | ||
344 | for table_name, table_update in table_updates.iteritems(): | |
8cdf0349 | 345 | table = self.tables.get(table_name) |
99155935 | 346 | if not table: |
f2d8ad13 BP |
347 | raise error.Error('<table-updates> includes unknown ' |
348 | 'table "%s"' % table_name) | |
99155935 BP |
349 | |
350 | if type(table_update) != dict: | |
f2d8ad13 BP |
351 | raise error.Error('<table-update> for table "%s" is not ' |
352 | 'an object' % table_name, table_update) | |
99155935 BP |
353 | |
354 | for uuid_string, row_update in table_update.iteritems(): | |
49c541dc | 355 | if not ovs.ovsuuid.is_valid_string(uuid_string): |
f2d8ad13 BP |
356 | raise error.Error('<table-update> for table "%s" ' |
357 | 'contains bad UUID "%s" as member ' | |
358 | 'name' % (table_name, uuid_string), | |
99155935 | 359 | table_update) |
49c541dc | 360 | uuid = ovs.ovsuuid.from_string(uuid_string) |
99155935 BP |
361 | |
362 | if type(row_update) != dict: | |
f2d8ad13 BP |
363 | raise error.Error('<table-update> for table "%s" ' |
364 | 'contains <row-update> for %s that ' | |
365 | 'is not an object' | |
99155935 BP |
366 | % (table_name, uuid_string)) |
367 | ||
af1eba26 | 368 | parser = ovs.db.parser.Parser(row_update, "row-update") |
4c0f6271 BP |
369 | old = parser.get_optional("old", [dict]) |
370 | new = parser.get_optional("new", [dict]) | |
371 | parser.finish() | |
99155935 | 372 | |
99155935 | 373 | if not old and not new: |
f2d8ad13 BP |
374 | raise error.Error('<row-update> missing "old" and ' |
375 | '"new" members', row_update) | |
99155935 | 376 | |
8cdf0349 | 377 | if self.__process_update(table, uuid, old, new): |
99155935 BP |
378 | self.change_seqno += 1 |
379 | ||
8cdf0349 | 380 | def __process_update(self, table, uuid, old, new): |
99155935 | 381 | """Returns True if a column changed, False otherwise.""" |
8cdf0349 | 382 | row = table.rows.get(uuid) |
7d48f8f8 | 383 | changed = False |
99155935 BP |
384 | if not new: |
385 | # Delete row. | |
386 | if row: | |
8cdf0349 | 387 | del table.rows[uuid] |
7d48f8f8 | 388 | changed = True |
99155935 BP |
389 | else: |
390 | # XXX rate-limit | |
3a656eaf EJ |
391 | vlog.warn("cannot delete missing row %s from table %s" |
392 | % (uuid, table.name)) | |
99155935 BP |
393 | elif not old: |
394 | # Insert row. | |
395 | if not row: | |
396 | row = self.__create_row(table, uuid) | |
7d48f8f8 | 397 | changed = True |
99155935 BP |
398 | else: |
399 | # XXX rate-limit | |
3a656eaf EJ |
400 | vlog.warn("cannot add existing row %s to table %s" |
401 | % (uuid, table.name)) | |
8cdf0349 | 402 | if self.__row_update(table, row, new): |
7d48f8f8 | 403 | changed = True |
99155935 BP |
404 | else: |
405 | if not row: | |
406 | row = self.__create_row(table, uuid) | |
7d48f8f8 | 407 | changed = True |
99155935 | 408 | # XXX rate-limit |
3a656eaf EJ |
409 | vlog.warn("cannot modify missing row %s in table %s" |
410 | % (uuid, table.name)) | |
8cdf0349 | 411 | if self.__row_update(table, row, new): |
7d48f8f8 BP |
412 | changed = True |
413 | return changed | |
99155935 | 414 | |
8cdf0349 | 415 | def __row_update(self, table, row, row_json): |
99155935 BP |
416 | changed = False |
417 | for column_name, datum_json in row_json.iteritems(): | |
418 | column = table.columns.get(column_name) | |
419 | if not column: | |
420 | # XXX rate-limit | |
3a656eaf EJ |
421 | vlog.warn("unknown column %s updating table %s" |
422 | % (column_name, table.name)) | |
99155935 BP |
423 | continue |
424 | ||
425 | try: | |
426 | datum = ovs.db.data.Datum.from_json(column.type, datum_json) | |
427 | except error.Error, e: | |
428 | # XXX rate-limit | |
3a656eaf EJ |
429 | vlog.warn("error parsing column %s in table %s: %s" |
430 | % (column_name, table.name, e)) | |
99155935 BP |
431 | continue |
432 | ||
8cdf0349 BP |
433 | if datum != row._data[column_name]: |
434 | row._data[column_name] = datum | |
435 | if column.alert: | |
436 | changed = True | |
99155935 BP |
437 | else: |
438 | # Didn't really change but the OVSDB monitor protocol always | |
439 | # includes every value in a row. | |
440 | pass | |
441 | return changed | |
442 | ||
99155935 | 443 | def __create_row(self, table, uuid): |
8cdf0349 | 444 | data = {} |
99155935 | 445 | for column in table.columns.itervalues(): |
8cdf0349 BP |
446 | data[column.name] = ovs.db.data.Datum.default(column.type) |
447 | row = table.rows[uuid] = Row(self, table, uuid, data) | |
99155935 BP |
448 | return row |
449 | ||
8cdf0349 BP |
450 | def __error(self): |
451 | self._session.force_reconnect() | |
452 | ||
453 | def __txn_abort_all(self): | |
454 | while self._outstanding_txns: | |
455 | txn = self._outstanding_txns.popitem()[1] | |
854a94d9 | 456 | txn._status = Transaction.TRY_AGAIN |
8cdf0349 BP |
457 | |
458 | def __txn_process_reply(self, msg): | |
459 | txn = self._outstanding_txns.pop(msg.id, None) | |
460 | if txn: | |
461 | txn._process_reply(msg) | |
462 | ||
26bb0f31 | 463 | |
8cdf0349 BP |
464 | def _uuid_to_row(atom, base): |
465 | if base.ref_table: | |
466 | return base.ref_table.rows.get(atom) | |
467 | else: | |
468 | return atom | |
469 | ||
26bb0f31 | 470 | |
8cdf0349 BP |
471 | def _row_to_uuid(value): |
472 | if type(value) == Row: | |
473 | return value.uuid | |
474 | else: | |
475 | return value | |
bf6ec045 | 476 | |
26bb0f31 | 477 | |
bf6ec045 | 478 | class Row(object): |
8cdf0349 BP |
479 | """A row within an IDL. |
480 | ||
481 | The client may access the following attributes directly: | |
482 | ||
483 | - 'uuid': a uuid.UUID object whose value is the row's database UUID. | |
484 | ||
485 | - An attribute for each column in the Row's table, named for the column, | |
486 | whose values are as returned by Datum.to_python() for the column's type. | |
487 | ||
488 | If some error occurs (e.g. the database server's idea of the column is | |
489 | different from the IDL's idea), then the attribute values is the | |
490 | "default" value return by Datum.default() for the column's type. (It is | |
491 | important to know this because the default value may violate constraints | |
492 | for the column's type, e.g. the default integer value is 0 even if column | |
493 | contraints require the column's value to be positive.) | |
494 | ||
495 | When a transaction is active, column attributes may also be assigned new | |
496 | values. Committing the transaction will then cause the new value to be | |
497 | stored into the database. | |
498 | ||
499 | *NOTE*: In the current implementation, the value of a column is a *copy* | |
500 | of the value in the database. This means that modifying its value | |
501 | directly will have no useful effect. For example, the following: | |
502 | row.mycolumn["a"] = "b" # don't do this | |
503 | will not change anything in the database, even after commit. To modify | |
504 | the column, instead assign the modified column value back to the column: | |
505 | d = row.mycolumn | |
506 | d["a"] = "b" | |
507 | row.mycolumn = d | |
508 | """ | |
509 | def __init__(self, idl, table, uuid, data): | |
510 | # All of the explicit references to self.__dict__ below are required | |
511 | # to set real attributes with invoking self.__getattr__(). | |
512 | self.__dict__["uuid"] = uuid | |
513 | ||
514 | self.__dict__["_idl"] = idl | |
515 | self.__dict__["_table"] = table | |
516 | ||
517 | # _data is the committed data. It takes the following values: | |
518 | # | |
519 | # - A dictionary that maps every column name to a Datum, if the row | |
520 | # exists in the committed form of the database. | |
521 | # | |
522 | # - None, if this row is newly inserted within the active transaction | |
523 | # and thus has no committed form. | |
524 | self.__dict__["_data"] = data | |
525 | ||
526 | # _changes describes changes to this row within the active transaction. | |
527 | # It takes the following values: | |
528 | # | |
529 | # - {}, the empty dictionary, if no transaction is active or if the | |
530 | # row has yet not been changed within this transaction. | |
531 | # | |
532 | # - A dictionary that maps a column name to its new Datum, if an | |
533 | # active transaction changes those columns' values. | |
534 | # | |
535 | # - A dictionary that maps every column name to a Datum, if the row | |
536 | # is newly inserted within the active transaction. | |
537 | # | |
538 | # - None, if this transaction deletes this row. | |
539 | self.__dict__["_changes"] = {} | |
540 | ||
541 | # A dictionary whose keys are the names of columns that must be | |
542 | # verified as prerequisites when the transaction commits. The values | |
543 | # in the dictionary are all None. | |
544 | self.__dict__["_prereqs"] = {} | |
545 | ||
546 | def __getattr__(self, column_name): | |
547 | assert self._changes is not None | |
548 | ||
549 | datum = self._changes.get(column_name) | |
550 | if datum is None: | |
551 | datum = self._data[column_name] | |
552 | ||
553 | return datum.to_python(_uuid_to_row) | |
554 | ||
555 | def __setattr__(self, column_name, value): | |
556 | assert self._changes is not None | |
557 | assert self._idl.txn | |
558 | ||
559 | column = self._table.columns[column_name] | |
560 | try: | |
561 | datum = ovs.db.data.Datum.from_python(column.type, value, | |
562 | _row_to_uuid) | |
563 | except error.Error, e: | |
564 | # XXX rate-limit | |
3a656eaf EJ |
565 | vlog.err("attempting to write bad value to column %s (%s)" |
566 | % (column_name, e)) | |
8cdf0349 BP |
567 | return |
568 | self._idl.txn._write(self, column, datum) | |
569 | ||
570 | def verify(self, column_name): | |
571 | """Causes the original contents of column 'column_name' in this row to | |
572 | be verified as a prerequisite to completing the transaction. That is, | |
573 | if 'column_name' changed in this row (or if this row was deleted) | |
574 | between the time that the IDL originally read its contents and the time | |
575 | that the transaction commits, then the transaction aborts and | |
854a94d9 | 576 | Transaction.commit() returns Transaction.TRY_AGAIN. |
8cdf0349 BP |
577 | |
578 | The intention is that, to ensure that no transaction commits based on | |
579 | dirty reads, an application should call Row.verify() on each data item | |
580 | read as part of a read-modify-write operation. | |
581 | ||
582 | In some cases Row.verify() reduces to a no-op, because the current | |
583 | value of the column is already known: | |
584 | ||
585 | - If this row is a row created by the current transaction (returned | |
586 | by Transaction.insert()). | |
587 | ||
588 | - If the column has already been modified within the current | |
589 | transaction. | |
590 | ||
591 | Because of the latter property, always call Row.verify() *before* | |
592 | modifying the column, for a given read-modify-write. | |
593 | ||
594 | A transaction must be in progress.""" | |
595 | assert self._idl.txn | |
596 | assert self._changes is not None | |
597 | if not self._data or column_name in self._changes: | |
598 | return | |
599 | ||
600 | self._prereqs[column_name] = None | |
601 | ||
602 | def delete(self): | |
603 | """Deletes this row from its table. | |
604 | ||
605 | A transaction must be in progress.""" | |
606 | assert self._idl.txn | |
607 | assert self._changes is not None | |
608 | if self._data is None: | |
609 | del self._idl.txn._txn_rows[self.uuid] | |
610 | self.__dict__["_changes"] = None | |
611 | del self._table.rows[self.uuid] | |
612 | ||
94fbe1aa BP |
613 | def increment(self, column_name): |
614 | """Causes the transaction, when committed, to increment the value of | |
615 | 'column_name' within this row by 1. 'column_name' must have an integer | |
616 | type. After the transaction commits successfully, the client may | |
617 | retrieve the final (incremented) value of 'column_name' with | |
618 | Transaction.get_increment_new_value(). | |
619 | ||
620 | The client could accomplish something similar by reading and writing | |
621 | and verify()ing columns. However, increment() will never (by itself) | |
622 | cause a transaction to fail because of a verify error. | |
623 | ||
624 | The intended use is for incrementing the "next_cfg" column in | |
625 | the Open_vSwitch table.""" | |
626 | self._idl.txn._increment(self, column_name) | |
627 | ||
26bb0f31 | 628 | |
8cdf0349 BP |
629 | def _uuid_name_from_uuid(uuid): |
630 | return "row%s" % str(uuid).replace("-", "_") | |
631 | ||
26bb0f31 | 632 | |
8cdf0349 BP |
633 | def _where_uuid_equals(uuid): |
634 | return [["_uuid", "==", ["uuid", str(uuid)]]] | |
635 | ||
26bb0f31 | 636 | |
8cdf0349 BP |
637 | class _InsertedRow(object): |
638 | def __init__(self, op_index): | |
639 | self.op_index = op_index | |
640 | self.real = None | |
641 | ||
26bb0f31 | 642 | |
8cdf0349 | 643 | class Transaction(object): |
2f926787 BP |
644 | """A transaction may modify the contents of a database by modifying the |
645 | values of columns, deleting rows, inserting rows, or adding checks that | |
646 | columns in the database have not changed ("verify" operations), through | |
647 | Row methods. | |
648 | ||
649 | Reading and writing columns and inserting and deleting rows are all | |
650 | straightforward. The reasons to verify columns are less obvious. | |
651 | Verification is the key to maintaining transactional integrity. Because | |
652 | OVSDB handles multiple clients, it can happen that between the time that | |
653 | OVSDB client A reads a column and writes a new value, OVSDB client B has | |
654 | written that column. Client A's write should not ordinarily overwrite | |
655 | client B's, especially if the column in question is a "map" column that | |
656 | contains several more or less independent data items. If client A adds a | |
657 | "verify" operation before it writes the column, then the transaction fails | |
658 | in case client B modifies it first. Client A will then see the new value | |
659 | of the column and compose a new transaction based on the new contents | |
660 | written by client B. | |
661 | ||
662 | When a transaction is complete, which must be before the next call to | |
663 | Idl.run(), call Transaction.commit() or Transaction.abort(). | |
664 | ||
665 | The life-cycle of a transaction looks like this: | |
666 | ||
667 | 1. Create the transaction and record the initial sequence number: | |
668 | ||
669 | seqno = idl.change_seqno(idl) | |
670 | txn = Transaction(idl) | |
671 | ||
672 | 2. Modify the database with Row and Transaction methods. | |
673 | ||
674 | 3. Commit the transaction by calling Transaction.commit(). The first call | |
675 | to this function probably returns Transaction.INCOMPLETE. The client | |
676 | must keep calling again along as this remains true, calling Idl.run() in | |
677 | between to let the IDL do protocol processing. (If the client doesn't | |
678 | have anything else to do in the meantime, it can use | |
679 | Transaction.commit_block() to avoid having to loop itself.) | |
680 | ||
681 | 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno | |
682 | to change from the saved 'seqno' (it's possible that it's already | |
683 | changed, in which case the client should not wait at all), then start | |
684 | over from step 1. Only a call to Idl.run() will change the return value | |
685 | of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)""" | |
686 | ||
8cdf0349 | 687 | # Status values that Transaction.commit() can return. |
26bb0f31 EJ |
688 | UNCOMMITTED = "uncommitted" # Not yet committed or aborted. |
689 | UNCHANGED = "unchanged" # Transaction didn't include any changes. | |
690 | INCOMPLETE = "incomplete" # Commit in progress, please wait. | |
691 | ABORTED = "aborted" # ovsdb_idl_txn_abort() called. | |
692 | SUCCESS = "success" # Commit successful. | |
854a94d9 | 693 | TRY_AGAIN = "try again" # Commit failed because a "verify" operation |
26bb0f31 | 694 | # reported an inconsistency, due to a network |
4fdfe5cc BP |
695 | # problem, or other transient failure. Wait |
696 | # for a change, then try again. | |
26bb0f31 EJ |
697 | NOT_LOCKED = "not locked" # Server hasn't given us the lock yet. |
698 | ERROR = "error" # Commit failed due to a hard error. | |
8cdf0349 BP |
699 | |
700 | @staticmethod | |
701 | def status_to_string(status): | |
702 | """Converts one of the status values that Transaction.commit() can | |
703 | return into a human-readable string. | |
704 | ||
705 | (The status values are in fact such strings already, so | |
706 | there's nothing to do.)""" | |
707 | return status | |
708 | ||
709 | def __init__(self, idl): | |
710 | """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl). | |
711 | A given Idl may only have a single active transaction at a time. | |
712 | ||
713 | A Transaction may modify the contents of a database by assigning new | |
714 | values to columns (attributes of Row), deleting rows (with | |
715 | Row.delete()), or inserting rows (with Transaction.insert()). It may | |
716 | also check that columns in the database have not changed with | |
717 | Row.verify(). | |
718 | ||
719 | When a transaction is complete (which must be before the next call to | |
720 | Idl.run()), call Transaction.commit() or Transaction.abort().""" | |
721 | assert idl.txn is None | |
722 | ||
723 | idl.txn = self | |
724 | self._request_id = None | |
725 | self.idl = idl | |
726 | self.dry_run = False | |
727 | self._txn_rows = {} | |
728 | self._status = Transaction.UNCOMMITTED | |
729 | self._error = None | |
730 | self._comments = [] | |
4fdfe5cc | 731 | self._commit_seqno = self.idl.change_seqno |
8cdf0349 | 732 | |
94fbe1aa | 733 | self._inc_row = None |
8cdf0349 | 734 | self._inc_column = None |
8cdf0349 | 735 | |
26bb0f31 | 736 | self._inserted_rows = {} # Map from UUID to _InsertedRow |
8cdf0349 BP |
737 | |
738 | def add_comment(self, comment): | |
739 | """Appens 'comment' to the comments that will be passed to the OVSDB | |
740 | server when this transaction is committed. (The comment will be | |
741 | committed to the OVSDB log, which "ovsdb-tool show-log" can print in a | |
742 | relatively human-readable form.)""" | |
743 | self._comments.append(comment) | |
744 | ||
8cdf0349 | 745 | def wait(self, poller): |
2f926787 BP |
746 | """Causes poll_block() to wake up if this transaction has completed |
747 | committing.""" | |
8cdf0349 BP |
748 | if self._status not in (Transaction.UNCOMMITTED, |
749 | Transaction.INCOMPLETE): | |
750 | poller.immediate_wake() | |
751 | ||
752 | def _substitute_uuids(self, json): | |
753 | if type(json) in (list, tuple): | |
754 | if (len(json) == 2 | |
755 | and json[0] == 'uuid' | |
756 | and ovs.ovsuuid.is_valid_string(json[1])): | |
757 | uuid = ovs.ovsuuid.from_string(json[1]) | |
758 | row = self._txn_rows.get(uuid, None) | |
759 | if row and row._data is None: | |
760 | return ["named-uuid", _uuid_name_from_uuid(uuid)] | |
761 | return json | |
762 | ||
763 | def __disassemble(self): | |
764 | self.idl.txn = None | |
765 | ||
766 | for row in self._txn_rows.itervalues(): | |
767 | if row._changes is None: | |
768 | row._table.rows[row.uuid] = row | |
769 | elif row._data is None: | |
770 | del row._table.rows[row.uuid] | |
771 | row.__dict__["_changes"] = {} | |
772 | row.__dict__["_prereqs"] = {} | |
773 | self._txn_rows = {} | |
774 | ||
775 | def commit(self): | |
2f926787 BP |
776 | """Attempts to commit 'txn'. Returns the status of the commit |
777 | operation, one of the following constants: | |
778 | ||
779 | Transaction.INCOMPLETE: | |
780 | ||
781 | The transaction is in progress, but not yet complete. The caller | |
782 | should call again later, after calling Idl.run() to let the | |
783 | IDL do OVSDB protocol processing. | |
784 | ||
785 | Transaction.UNCHANGED: | |
786 | ||
787 | The transaction is complete. (It didn't actually change the | |
788 | database, so the IDL didn't send any request to the database | |
789 | server.) | |
790 | ||
791 | Transaction.ABORTED: | |
792 | ||
793 | The caller previously called Transaction.abort(). | |
794 | ||
795 | Transaction.SUCCESS: | |
796 | ||
797 | The transaction was successful. The update made by the | |
798 | transaction (and possibly other changes made by other database | |
799 | clients) should already be visible in the IDL. | |
800 | ||
801 | Transaction.TRY_AGAIN: | |
802 | ||
803 | The transaction failed for some transient reason, e.g. because a | |
804 | "verify" operation reported an inconsistency or due to a network | |
805 | problem. The caller should wait for a change to the database, | |
806 | then compose a new transaction, and commit the new transaction. | |
807 | ||
808 | Use Idl.change_seqno to wait for a change in the database. It is | |
809 | important to use its value *before* the initial call to | |
810 | Transaction.commit() as the baseline for this purpose, because | |
811 | the change that one should wait for can happen after the initial | |
812 | call but before the call that returns Transaction.TRY_AGAIN, and | |
813 | using some other baseline value in that situation could cause an | |
814 | indefinite wait if the database rarely changes. | |
815 | ||
816 | Transaction.NOT_LOCKED: | |
817 | ||
818 | The transaction failed because the IDL has been configured to | |
819 | require a database lock (with Idl.set_lock()) but didn't | |
820 | get it yet or has already lost it. | |
8cdf0349 BP |
821 | |
822 | Committing a transaction rolls back all of the changes that it made to | |
2f926787 | 823 | the IDL's copy of the database. If the transaction commits |
8cdf0349 | 824 | successfully, then the database server will send an update and, thus, |
2f926787 | 825 | the IDL will be updated with the committed changes.""" |
8cdf0349 BP |
826 | # The status can only change if we're the active transaction. |
827 | # (Otherwise, our status will change only in Idl.run().) | |
828 | if self != self.idl.txn: | |
829 | return self._status | |
830 | ||
831 | # If we need a lock but don't have it, give up quickly. | |
832 | if self.idl.lock_name and not self.idl.has_lock(): | |
833 | self._status = Transaction.NOT_LOCKED | |
834 | self.__disassemble() | |
835 | return self._status | |
836 | ||
837 | operations = [self.idl._db.name] | |
838 | ||
839 | # Assert that we have the required lock (avoiding a race). | |
840 | if self.idl.lock_name: | |
841 | operations.append({"op": "assert", | |
842 | "lock": self.idl.lock_name}) | |
843 | ||
844 | # Add prerequisites and declarations of new rows. | |
845 | for row in self._txn_rows.itervalues(): | |
846 | if row._prereqs: | |
847 | rows = {} | |
848 | columns = [] | |
849 | for column_name in row._prereqs: | |
850 | columns.append(column_name) | |
851 | rows[column_name] = row._data[column_name].to_json() | |
852 | operations.append({"op": "wait", | |
853 | "table": row._table.name, | |
854 | "timeout": 0, | |
855 | "where": _where_uuid_equals(row.uuid), | |
856 | "until": "==", | |
857 | "columns": columns, | |
858 | "rows": [rows]}) | |
859 | ||
860 | # Add updates. | |
861 | any_updates = False | |
862 | for row in self._txn_rows.itervalues(): | |
863 | if row._changes is None: | |
864 | if row._table.is_root: | |
865 | operations.append({"op": "delete", | |
866 | "table": row._table.name, | |
867 | "where": _where_uuid_equals(row.uuid)}) | |
868 | any_updates = True | |
869 | else: | |
870 | # Let ovsdb-server decide whether to really delete it. | |
871 | pass | |
872 | elif row._changes: | |
873 | op = {"table": row._table.name} | |
874 | if row._data is None: | |
875 | op["op"] = "insert" | |
876 | op["uuid-name"] = _uuid_name_from_uuid(row.uuid) | |
877 | any_updates = True | |
878 | ||
879 | op_index = len(operations) - 1 | |
880 | self._inserted_rows[row.uuid] = _InsertedRow(op_index) | |
881 | else: | |
882 | op["op"] = "update" | |
883 | op["where"] = _where_uuid_equals(row.uuid) | |
884 | ||
885 | row_json = {} | |
886 | op["row"] = row_json | |
887 | ||
888 | for column_name, datum in row._changes.iteritems(): | |
889 | if row._data is not None or not datum.is_default(): | |
26bb0f31 EJ |
890 | row_json[column_name] = ( |
891 | self._substitute_uuids(datum.to_json())) | |
8cdf0349 BP |
892 | |
893 | # If anything really changed, consider it an update. | |
894 | # We can't suppress not-really-changed values earlier | |
895 | # or transactions would become nonatomic (see the big | |
896 | # comment inside Transaction._write()). | |
897 | if (not any_updates and row._data is not None and | |
898 | row._data[column_name] != datum): | |
899 | any_updates = True | |
900 | ||
901 | if row._data is None or row_json: | |
902 | operations.append(op) | |
903 | ||
904 | # Add increment. | |
94fbe1aa | 905 | if self._inc_row and any_updates: |
8cdf0349 BP |
906 | self._inc_index = len(operations) - 1 |
907 | ||
908 | operations.append({"op": "mutate", | |
94fbe1aa | 909 | "table": self._inc_row._table.name, |
26bb0f31 | 910 | "where": self._substitute_uuids( |
94fbe1aa | 911 | _where_uuid_equals(self._inc_row.uuid)), |
8cdf0349 BP |
912 | "mutations": [[self._inc_column, "+=", 1]]}) |
913 | operations.append({"op": "select", | |
94fbe1aa | 914 | "table": self._inc_row._table.name, |
26bb0f31 | 915 | "where": self._substitute_uuids( |
94fbe1aa | 916 | _where_uuid_equals(self._inc_row.uuid)), |
8cdf0349 BP |
917 | "columns": [self._inc_column]}) |
918 | ||
919 | # Add comment. | |
920 | if self._comments: | |
921 | operations.append({"op": "comment", | |
922 | "comment": "\n".join(self._comments)}) | |
923 | ||
924 | # Dry run? | |
925 | if self.dry_run: | |
926 | operations.append({"op": "abort"}) | |
927 | ||
928 | if not any_updates: | |
929 | self._status = Transaction.UNCHANGED | |
930 | else: | |
931 | msg = ovs.jsonrpc.Message.create_request("transact", operations) | |
932 | self._request_id = msg.id | |
933 | if not self.idl._session.send(msg): | |
934 | self.idl._outstanding_txns[self._request_id] = self | |
935 | self._status = Transaction.INCOMPLETE | |
936 | else: | |
854a94d9 | 937 | self._status = Transaction.TRY_AGAIN |
8cdf0349 BP |
938 | |
939 | self.__disassemble() | |
940 | return self._status | |
941 | ||
942 | def commit_block(self): | |
2f926787 BP |
943 | """Attempts to commit this transaction, blocking until the commit |
944 | either succeeds or fails. Returns the final commit status, which may | |
945 | be any Transaction.* value other than Transaction.INCOMPLETE. | |
946 | ||
947 | This function calls Idl.run() on this transaction'ss IDL, so it may | |
948 | cause Idl.change_seqno to change.""" | |
8cdf0349 BP |
949 | while True: |
950 | status = self.commit() | |
951 | if status != Transaction.INCOMPLETE: | |
952 | return status | |
953 | ||
954 | self.idl.run() | |
955 | ||
956 | poller = ovs.poller.Poller() | |
957 | self.idl.wait(poller) | |
958 | self.wait(poller) | |
959 | poller.block() | |
960 | ||
961 | def get_increment_new_value(self): | |
2f926787 BP |
962 | """Returns the final (incremented) value of the column in this |
963 | transaction that was set to be incremented by Row.increment. This | |
964 | transaction must have committed successfully.""" | |
8cdf0349 BP |
965 | assert self._status == Transaction.SUCCESS |
966 | return self._inc_new_value | |
967 | ||
968 | def abort(self): | |
969 | """Aborts this transaction. If Transaction.commit() has already been | |
970 | called then the transaction might get committed anyhow.""" | |
971 | self.__disassemble() | |
972 | if self._status in (Transaction.UNCOMMITTED, | |
973 | Transaction.INCOMPLETE): | |
974 | self._status = Transaction.ABORTED | |
975 | ||
976 | def get_error(self): | |
977 | """Returns a string representing this transaction's current status, | |
978 | suitable for use in log messages.""" | |
979 | if self._status != Transaction.ERROR: | |
980 | return Transaction.status_to_string(self._status) | |
981 | elif self._error: | |
982 | return self._error | |
983 | else: | |
984 | return "no error details available" | |
985 | ||
986 | def __set_error_json(self, json): | |
987 | if self._error is None: | |
988 | self._error = ovs.json.to_string(json) | |
989 | ||
990 | def get_insert_uuid(self, uuid): | |
991 | """Finds and returns the permanent UUID that the database assigned to a | |
992 | newly inserted row, given the UUID that Transaction.insert() assigned | |
993 | locally to that row. | |
994 | ||
995 | Returns None if 'uuid' is not a UUID assigned by Transaction.insert() | |
996 | or if it was assigned by that function and then deleted by Row.delete() | |
997 | within the same transaction. (Rows that are inserted and then deleted | |
998 | within a single transaction are never sent to the database server, so | |
999 | it never assigns them a permanent UUID.) | |
1000 | ||
1001 | This transaction must have completed successfully.""" | |
1002 | assert self._status in (Transaction.SUCCESS, | |
1003 | Transaction.UNCHANGED) | |
1004 | inserted_row = self._inserted_rows.get(uuid) | |
1005 | if inserted_row: | |
1006 | return inserted_row.real | |
1007 | return None | |
1008 | ||
94fbe1aa BP |
1009 | def _increment(self, row, column): |
1010 | assert not self._inc_row | |
1011 | self._inc_row = row | |
1012 | self._inc_column = column | |
1013 | ||
8cdf0349 BP |
1014 | def _write(self, row, column, datum): |
1015 | assert row._changes is not None | |
1016 | ||
1017 | txn = row._idl.txn | |
1018 | ||
1019 | # If this is a write-only column and the datum being written is the | |
1020 | # same as the one already there, just skip the update entirely. This | |
1021 | # is worth optimizing because we have a lot of columns that get | |
1022 | # periodically refreshed into the database but don't actually change | |
1023 | # that often. | |
1024 | # | |
1025 | # We don't do this for read/write columns because that would break | |
1026 | # atomicity of transactions--some other client might have written a | |
1027 | # different value in that column since we read it. (But if a whole | |
1028 | # transaction only does writes of existing values, without making any | |
1029 | # real changes, we will drop the whole transaction later in | |
1030 | # ovsdb_idl_txn_commit().) | |
1031 | if not column.alert and row._data.get(column.name) == datum: | |
1032 | new_value = row._changes.get(column.name) | |
1033 | if new_value is None or new_value == datum: | |
1034 | return | |
1035 | ||
1036 | txn._txn_rows[row.uuid] = row | |
1037 | row._changes[column.name] = datum.copy() | |
1038 | ||
1039 | def insert(self, table, new_uuid=None): | |
1040 | """Inserts and returns a new row in 'table', which must be one of the | |
1041 | ovs.db.schema.TableSchema objects in the Idl's 'tables' dict. | |
1042 | ||
1043 | The new row is assigned a provisional UUID. If 'uuid' is None then one | |
1044 | is randomly generated; otherwise 'uuid' should specify a randomly | |
1045 | generated uuid.UUID not otherwise in use. ovsdb-server will assign a | |
1046 | different UUID when 'txn' is committed, but the IDL will replace any | |
1047 | uses of the provisional UUID in the data to be to be committed by the | |
1048 | UUID assigned by ovsdb-server.""" | |
1049 | assert self._status == Transaction.UNCOMMITTED | |
1050 | if new_uuid is None: | |
1051 | new_uuid = uuid.uuid4() | |
1052 | row = Row(self.idl, table, new_uuid, None) | |
1053 | table.rows[row.uuid] = row | |
1054 | self._txn_rows[row.uuid] = row | |
1055 | return row | |
1056 | ||
1057 | def _process_reply(self, msg): | |
1058 | if msg.type == ovs.jsonrpc.Message.T_ERROR: | |
1059 | self._status = Transaction.ERROR | |
1060 | elif type(msg.result) not in (list, tuple): | |
1061 | # XXX rate-limit | |
3a656eaf | 1062 | vlog.warn('reply to "transact" is not JSON array') |
8cdf0349 BP |
1063 | else: |
1064 | hard_errors = False | |
1065 | soft_errors = False | |
1066 | lock_errors = False | |
1067 | ||
1068 | ops = msg.result | |
1069 | for op in ops: | |
1070 | if op is None: | |
1071 | # This isn't an error in itself but indicates that some | |
1072 | # prior operation failed, so make sure that we know about | |
1073 | # it. | |
1074 | soft_errors = True | |
1075 | elif type(op) == dict: | |
1076 | error = op.get("error") | |
1077 | if error is not None: | |
1078 | if error == "timed out": | |
1079 | soft_errors = True | |
1080 | elif error == "not owner": | |
1081 | lock_errors = True | |
1082 | elif error == "aborted": | |
1083 | pass | |
1084 | else: | |
1085 | hard_errors = True | |
1086 | self.__set_error_json(op) | |
1087 | else: | |
1088 | hard_errors = True | |
1089 | self.__set_error_json(op) | |
1090 | # XXX rate-limit | |
3a656eaf | 1091 | vlog.warn("operation reply is not JSON null or object") |
8cdf0349 BP |
1092 | |
1093 | if not soft_errors and not hard_errors and not lock_errors: | |
94fbe1aa | 1094 | if self._inc_row and not self.__process_inc_reply(ops): |
8cdf0349 BP |
1095 | hard_errors = True |
1096 | ||
1097 | for insert in self._inserted_rows.itervalues(): | |
1098 | if not self.__process_insert_reply(insert, ops): | |
1099 | hard_errors = True | |
1100 | ||
1101 | if hard_errors: | |
1102 | self._status = Transaction.ERROR | |
1103 | elif lock_errors: | |
1104 | self._status = Transaction.NOT_LOCKED | |
1105 | elif soft_errors: | |
854a94d9 | 1106 | self._status = Transaction.TRY_AGAIN |
8cdf0349 BP |
1107 | else: |
1108 | self._status = Transaction.SUCCESS | |
1109 | ||
1110 | @staticmethod | |
1111 | def __check_json_type(json, types, name): | |
1112 | if not json: | |
1113 | # XXX rate-limit | |
3a656eaf | 1114 | vlog.warn("%s is missing" % name) |
8cdf0349 BP |
1115 | return False |
1116 | elif type(json) not in types: | |
1117 | # XXX rate-limit | |
3a656eaf | 1118 | vlog.warn("%s has unexpected type %s" % (name, type(json))) |
8cdf0349 BP |
1119 | return False |
1120 | else: | |
1121 | return True | |
1122 | ||
1123 | def __process_inc_reply(self, ops): | |
1124 | if self._inc_index + 2 > len(ops): | |
1125 | # XXX rate-limit | |
3a656eaf EJ |
1126 | vlog.warn("reply does not contain enough operations for " |
1127 | "increment (has %d, needs %d)" % | |
1128 | (len(ops), self._inc_index + 2)) | |
8cdf0349 BP |
1129 | |
1130 | # We know that this is a JSON object because the loop in | |
1131 | # __process_reply() already checked. | |
1132 | mutate = ops[self._inc_index] | |
1133 | count = mutate.get("count") | |
1134 | if not Transaction.__check_json_type(count, (int, long), | |
1135 | '"mutate" reply "count"'): | |
1136 | return False | |
1137 | if count != 1: | |
1138 | # XXX rate-limit | |
3a656eaf | 1139 | vlog.warn('"mutate" reply "count" is %d instead of 1' % count) |
8cdf0349 BP |
1140 | return False |
1141 | ||
1142 | select = ops[self._inc_index + 1] | |
1143 | rows = select.get("rows") | |
1144 | if not Transaction.__check_json_type(rows, (list, tuple), | |
1145 | '"select" reply "rows"'): | |
1146 | return False | |
1147 | if len(rows) != 1: | |
1148 | # XXX rate-limit | |
3a656eaf EJ |
1149 | vlog.warn('"select" reply "rows" has %d elements ' |
1150 | 'instead of 1' % len(rows)) | |
8cdf0349 BP |
1151 | return False |
1152 | row = rows[0] | |
1153 | if not Transaction.__check_json_type(row, (dict,), | |
1154 | '"select" reply row'): | |
1155 | return False | |
1156 | column = row.get(self._inc_column) | |
1157 | if not Transaction.__check_json_type(column, (int, long), | |
1158 | '"select" reply inc column'): | |
1159 | return False | |
1160 | self._inc_new_value = column | |
1161 | return True | |
1162 | ||
1163 | def __process_insert_reply(self, insert, ops): | |
1164 | if insert.op_index >= len(ops): | |
1165 | # XXX rate-limit | |
3a656eaf EJ |
1166 | vlog.warn("reply does not contain enough operations " |
1167 | "for insert (has %d, needs %d)" | |
1168 | % (len(ops), insert.op_index)) | |
8cdf0349 BP |
1169 | return False |
1170 | ||
1171 | # We know that this is a JSON object because the loop in | |
1172 | # __process_reply() already checked. | |
1173 | reply = ops[insert.op_index] | |
1174 | json_uuid = reply.get("uuid") | |
1175 | if not Transaction.__check_json_type(json_uuid, (tuple, list), | |
1176 | '"insert" reply "uuid"'): | |
1177 | return False | |
1178 | ||
1179 | try: | |
1180 | uuid_ = ovs.ovsuuid.from_json(json_uuid) | |
1181 | except error.Error: | |
1182 | # XXX rate-limit | |
3a656eaf | 1183 | vlog.warn('"insert" reply "uuid" is not a JSON UUID') |
8cdf0349 | 1184 | return False |
bf6ec045 | 1185 | |
8cdf0349 BP |
1186 | insert.real = uuid_ |
1187 | return True | |
ad0991e6 EJ |
1188 | |
1189 | ||
1190 | class SchemaHelper(object): | |
1191 | """IDL Schema helper. | |
1192 | ||
1193 | This class encapsulates the logic required to generate schemas suitable | |
1194 | for creating 'ovs.db.idl.Idl' objects. Clients should register columns | |
1195 | they are interested in using register_columns(). When finished, the | |
1196 | get_idl_schema() function may be called. | |
1197 | ||
1198 | The location on disk of the schema used may be found in the | |
1199 | 'schema_location' variable.""" | |
1200 | ||
e15ad8e6 IY |
1201 | def __init__(self, location=None, schema_json=None): |
1202 | """Creates a new Schema object. | |
ad0991e6 | 1203 | |
e15ad8e6 IY |
1204 | 'location' file path to ovs schema. None means default location |
1205 | 'schema_json' schema in json preresentation in memory | |
1206 | """ | |
1207 | ||
1208 | if location and schema_json: | |
1209 | raise ValueError("both location and schema_json can't be " | |
1210 | "specified. it's ambiguous.") | |
1211 | if schema_json is None: | |
1212 | if location is None: | |
1213 | location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR | |
1214 | schema_json = ovs.json.from_file(location) | |
ad0991e6 | 1215 | |
e15ad8e6 | 1216 | self.schema_json = schema_json |
ad0991e6 | 1217 | self._tables = {} |
bf42f674 | 1218 | self._all = False |
ad0991e6 EJ |
1219 | |
1220 | def register_columns(self, table, columns): | |
1221 | """Registers interest in the given 'columns' of 'table'. Future calls | |
1222 | to get_idl_schema() will include 'table':column for each column in | |
1223 | 'columns'. This function automatically avoids adding duplicate entries | |
1224 | to the schema. | |
1225 | ||
1226 | 'table' must be a string. | |
1227 | 'columns' must be a list of strings. | |
1228 | """ | |
1229 | ||
1230 | assert type(table) is str | |
1231 | assert type(columns) is list | |
1232 | ||
1233 | columns = set(columns) | self._tables.get(table, set()) | |
1234 | self._tables[table] = columns | |
1235 | ||
7698e31d IY |
1236 | def register_table(self, table): |
1237 | """Registers interest in the given all columns of 'table'. Future calls | |
1238 | to get_idl_schema() will include all columns of 'table'. | |
1239 | ||
1240 | 'table' must be a string | |
1241 | """ | |
1242 | assert type(table) is str | |
1243 | self._tables[table] = set() # empty set means all columns in the table | |
1244 | ||
bf42f674 EJ |
1245 | def register_all(self): |
1246 | """Registers interest in every column of every table.""" | |
1247 | self._all = True | |
1248 | ||
ad0991e6 EJ |
1249 | def get_idl_schema(self): |
1250 | """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL' | |
1251 | object based on columns registered using the register_columns() | |
1252 | function.""" | |
1253 | ||
e15ad8e6 IY |
1254 | schema = ovs.db.schema.DbSchema.from_json(self.schema_json) |
1255 | self.schema_json = None | |
ad0991e6 | 1256 | |
bf42f674 EJ |
1257 | if not self._all: |
1258 | schema_tables = {} | |
1259 | for table, columns in self._tables.iteritems(): | |
1260 | schema_tables[table] = ( | |
1261 | self._keep_table_columns(schema, table, columns)) | |
1262 | ||
1263 | schema.tables = schema_tables | |
ad0991e6 EJ |
1264 | return schema |
1265 | ||
1266 | def _keep_table_columns(self, schema, table_name, columns): | |
1267 | assert table_name in schema.tables | |
1268 | table = schema.tables[table_name] | |
1269 | ||
7698e31d IY |
1270 | if not columns: |
1271 | # empty set means all columns in the table | |
1272 | return table | |
1273 | ||
ad0991e6 EJ |
1274 | new_columns = {} |
1275 | for column_name in columns: | |
1276 | assert type(column_name) is str | |
1277 | assert column_name in table.columns | |
1278 | ||
1279 | new_columns[column_name] = table.columns[column_name] | |
1280 | ||
1281 | table.columns = new_columns | |
1282 | return table |