]>
Commit | Line | Data |
---|---|---|
ad0991e6 | 1 | # Copyright (c) 2009, 2010, 2011, 2012 Nicira Networks |
99155935 BP |
2 | # |
3 | # Licensed under the Apache License, Version 2.0 (the "License"); | |
4 | # you may not use this file except in compliance with the License. | |
5 | # You may obtain a copy of the License at: | |
6 | # | |
7 | # http://www.apache.org/licenses/LICENSE-2.0 | |
8 | # | |
9 | # Unless required by applicable law or agreed to in writing, software | |
10 | # distributed under the License is distributed on an "AS IS" BASIS, | |
11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 | # See the License for the specific language governing permissions and | |
13 | # limitations under the License. | |
14 | ||
8cdf0349 | 15 | import uuid |
99155935 BP |
16 | |
17 | import ovs.jsonrpc | |
4c0f6271 | 18 | import ovs.db.parser |
99155935 BP |
19 | import ovs.db.schema |
20 | from ovs.db import error | |
21 | import ovs.ovsuuid | |
8cdf0349 | 22 | import ovs.poller |
3a656eaf EJ |
23 | import ovs.vlog |
24 | ||
25 | vlog = ovs.vlog.Vlog("idl") | |
99155935 | 26 | |
26bb0f31 EJ |
27 | __pychecker__ = 'no-classattr no-objattrs' |
28 | ||
29 | ||
99155935 BP |
30 | class Idl: |
31 | """Open vSwitch Database Interface Definition Language (OVSDB IDL). | |
32 | ||
33 | The OVSDB IDL maintains an in-memory replica of a database. It issues RPC | |
34 | requests to an OVSDB database server and parses the responses, converting | |
35 | raw JSON into data structures that are easier for clients to digest. | |
36 | ||
37 | The IDL also assists with issuing database transactions. The client | |
38 | creates a transaction, manipulates the IDL data structures, and commits or | |
39 | aborts the transaction. The IDL then composes and issues the necessary | |
40 | JSON-RPC requests and reports to the client whether the transaction | |
41 | completed successfully. | |
42 | ||
8cdf0349 BP |
43 | The client is allowed to access the following attributes directly, in a |
44 | read-only fashion: | |
99155935 | 45 | |
8cdf0349 BP |
46 | - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided |
47 | to the Idl constructor. Each ovs.db.schema.TableSchema in the map is | |
48 | annotated with a new attribute 'rows', which is a dict from a uuid.UUID | |
49 | to a Row object. | |
50 | ||
51 | The client may directly read and write the Row objects referenced by the | |
52 | 'rows' map values. Refer to Row for more details. | |
53 | ||
54 | - 'change_seqno': A number that represents the IDL's state. When the IDL | |
55 | is updated (by Idl.run()), its value changes. | |
56 | ||
57 | - 'lock_name': The name of the lock configured with Idl.set_lock(), or None | |
58 | if no lock is configured. | |
59 | ||
60 | - 'has_lock': True, if the IDL is configured to obtain a lock and owns that | |
61 | lock, and False otherwise. | |
62 | ||
63 | Locking and unlocking happens asynchronously from the database client's | |
64 | point of view, so the information is only useful for optimization | |
65 | (e.g. if the client doesn't have the lock then there's no point in trying | |
66 | to write to the database). | |
67 | ||
68 | - 'is_lock_contended': True, if the IDL is configured to obtain a lock but | |
69 | the database server has indicated that some other client already owns the | |
70 | requested lock, and False otherwise. | |
71 | ||
72 | - 'txn': The ovs.db.idl.Transaction object for the database transaction | |
73 | currently being constructed, if there is one, or None otherwise. | |
74 | """ | |
75 | ||
76 | def __init__(self, remote, schema): | |
99155935 BP |
77 | """Creates and returns a connection to the database named 'db_name' on |
78 | 'remote', which should be in a form acceptable to | |
79 | ovs.jsonrpc.session.open(). The connection will maintain an in-memory | |
8cdf0349 BP |
80 | replica of the remote database. |
81 | ||
82 | 'schema' should be the schema for the remote database. The caller may | |
83 | have cut it down by removing tables or columns that are not of | |
84 | interest. The IDL will only replicate the tables and columns that | |
85 | remain. The caller may also add a attribute named 'alert' to selected | |
86 | remaining columns, setting its value to False; if so, then changes to | |
87 | those columns will not be considered changes to the database for the | |
88 | purpose of the return value of Idl.run() and Idl.change_seqno. This is | |
89 | useful for columns that the IDL's client will write but not read. | |
90 | ||
ad0991e6 EJ |
91 | As a convenience to users, 'schema' may also be an instance of the |
92 | SchemaHelper class. | |
93 | ||
8cdf0349 BP |
94 | The IDL uses and modifies 'schema' directly.""" |
95 | ||
ad0991e6 EJ |
96 | if isinstance(schema, SchemaHelper): |
97 | schema = schema.get_idl_schema() | |
98 | ||
8cdf0349 BP |
99 | self.tables = schema.tables |
100 | self._db = schema | |
101 | self._session = ovs.jsonrpc.Session.open(remote) | |
102 | self._monitor_request_id = None | |
103 | self._last_seqno = None | |
99155935 | 104 | self.change_seqno = 0 |
8cdf0349 BP |
105 | |
106 | # Database locking. | |
107 | self.lock_name = None # Name of lock we need, None if none. | |
108 | self.has_lock = False # Has db server said we have the lock? | |
26bb0f31 | 109 | self.is_lock_contended = False # Has db server said we can't get lock? |
8cdf0349 BP |
110 | self._lock_request_id = None # JSON-RPC ID of in-flight lock request. |
111 | ||
112 | # Transaction support. | |
113 | self.txn = None | |
114 | self._outstanding_txns = {} | |
115 | ||
116 | for table in schema.tables.itervalues(): | |
117 | for column in table.columns.itervalues(): | |
118 | if not hasattr(column, 'alert'): | |
119 | column.alert = True | |
120 | table.need_table = False | |
121 | table.rows = {} | |
122 | table.idl = self | |
99155935 BP |
123 | |
124 | def close(self): | |
8cdf0349 BP |
125 | """Closes the connection to the database. The IDL will no longer |
126 | update.""" | |
127 | self._session.close() | |
99155935 BP |
128 | |
129 | def run(self): | |
130 | """Processes a batch of messages from the database server. Returns | |
131 | True if the database as seen through the IDL changed, False if it did | |
132 | not change. The initial fetch of the entire contents of the remote | |
8cdf0349 BP |
133 | database is considered to be one kind of change. If the IDL has been |
134 | configured to acquire a database lock (with Idl.set_lock()), then | |
135 | successfully acquiring the lock is also considered to be a change. | |
99155935 BP |
136 | |
137 | This function can return occasional false positives, that is, report | |
138 | that the database changed even though it didn't. This happens if the | |
139 | connection to the database drops and reconnects, which causes the | |
140 | database contents to be reloaded even if they didn't change. (It could | |
141 | also happen if the database server sends out a "change" that reflects | |
142 | what we already thought was in the database, but the database server is | |
143 | not supposed to do that.) | |
144 | ||
145 | As an alternative to checking the return value, the client may check | |
8cdf0349 BP |
146 | for changes in self.change_seqno.""" |
147 | assert not self.txn | |
99155935 | 148 | initial_change_seqno = self.change_seqno |
8cdf0349 BP |
149 | self._session.run() |
150 | i = 0 | |
151 | while i < 50: | |
152 | i += 1 | |
153 | if not self._session.is_connected(): | |
154 | break | |
155 | ||
156 | seqno = self._session.get_seqno() | |
157 | if seqno != self._last_seqno: | |
158 | self._last_seqno = seqno | |
159 | self.__txn_abort_all() | |
160 | self.__send_monitor_request() | |
161 | if self.lock_name: | |
162 | self.__send_lock_request() | |
163 | break | |
164 | ||
165 | msg = self._session.recv() | |
166 | if msg is None: | |
167 | break | |
168 | if (msg.type == ovs.jsonrpc.Message.T_NOTIFY | |
169 | and msg.method == "update" | |
170 | and len(msg.params) == 2 | |
171 | and msg.params[0] == None): | |
172 | # Database contents changed. | |
173 | self.__parse_update(msg.params[1]) | |
174 | elif (msg.type == ovs.jsonrpc.Message.T_REPLY | |
175 | and self._monitor_request_id is not None | |
176 | and self._monitor_request_id == msg.id): | |
177 | # Reply to our "monitor" request. | |
178 | try: | |
179 | self.change_seqno += 1 | |
180 | self._monitor_request_id = None | |
181 | self.__clear() | |
182 | self.__parse_update(msg.result) | |
183 | except error.Error, e: | |
3a656eaf EJ |
184 | vlog.err("%s: parse error in received schema: %s" |
185 | % (self._session.get_name(), e)) | |
8cdf0349 BP |
186 | self.__error() |
187 | elif (msg.type == ovs.jsonrpc.Message.T_REPLY | |
188 | and self._lock_request_id is not None | |
189 | and self._lock_request_id == msg.id): | |
190 | # Reply to our "lock" request. | |
191 | self.__parse_lock_reply(msg.result) | |
192 | elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY | |
193 | and msg.method == "locked"): | |
194 | # We got our lock. | |
195 | self.__parse_lock_notify(msg.params, True) | |
196 | elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY | |
197 | and msg.method == "stolen"): | |
198 | # Someone else stole our lock. | |
199 | self.__parse_lock_notify(msg.params, False) | |
200 | elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo": | |
201 | # Reply to our echo request. Ignore it. | |
202 | pass | |
203 | elif (msg.type in (ovs.jsonrpc.Message.T_ERROR, | |
204 | ovs.jsonrpc.Message.T_REPLY) | |
205 | and self.__txn_process_reply(msg)): | |
206 | # __txn_process_reply() did everything needed. | |
207 | pass | |
208 | else: | |
209 | # This can happen if a transaction is destroyed before we | |
210 | # receive the reply, so keep the log level low. | |
3a656eaf EJ |
211 | vlog.dbg("%s: received unexpected %s message" |
212 | % (self._session.get_name(), | |
213 | ovs.jsonrpc.Message.type_to_string(msg.type))) | |
8cdf0349 | 214 | |
99155935 BP |
215 | return initial_change_seqno != self.change_seqno |
216 | ||
217 | def wait(self, poller): | |
218 | """Arranges for poller.block() to wake up when self.run() has something | |
219 | to do or when activity occurs on a transaction on 'self'.""" | |
8cdf0349 BP |
220 | self._session.wait(poller) |
221 | self._session.recv_wait(poller) | |
99155935 | 222 | |
8cdf0349 BP |
223 | def has_ever_connected(self): |
224 | """Returns True, if the IDL successfully connected to the remote | |
225 | database and retrieved its contents (even if the connection | |
226 | subsequently dropped and is in the process of reconnecting). If so, | |
227 | then the IDL contains an atomic snapshot of the database's contents | |
228 | (but it might be arbitrarily old if the connection dropped). | |
229 | ||
230 | Returns False if the IDL has never connected or retrieved the | |
231 | database's contents. If so, the IDL is empty.""" | |
232 | return self.change_seqno != 0 | |
233 | ||
234 | def force_reconnect(self): | |
235 | """Forces the IDL to drop its connection to the database and reconnect. | |
236 | In the meantime, the contents of the IDL will not change.""" | |
237 | self._session.force_reconnect() | |
238 | ||
239 | def set_lock(self, lock_name): | |
240 | """If 'lock_name' is not None, configures the IDL to obtain the named | |
241 | lock from the database server and to avoid modifying the database when | |
242 | the lock cannot be acquired (that is, when another client has the same | |
243 | lock). | |
244 | ||
245 | If 'lock_name' is None, drops the locking requirement and releases the | |
246 | lock.""" | |
247 | assert not self.txn | |
248 | assert not self._outstanding_txns | |
249 | ||
250 | if self.lock_name and (not lock_name or lock_name != self.lock_name): | |
251 | # Release previous lock. | |
252 | self.__send_unlock_request() | |
253 | self.lock_name = None | |
254 | self.is_lock_contended = False | |
255 | ||
256 | if lock_name and not self.lock_name: | |
257 | # Acquire new lock. | |
258 | self.lock_name = lock_name | |
259 | self.__send_lock_request() | |
260 | ||
261 | def __clear(self): | |
262 | changed = False | |
263 | ||
264 | for table in self.tables.itervalues(): | |
265 | if table.rows: | |
266 | changed = True | |
267 | table.rows = {} | |
99155935 | 268 | |
8cdf0349 BP |
269 | if changed: |
270 | self.change_seqno += 1 | |
271 | ||
272 | def __update_has_lock(self, new_has_lock): | |
273 | if new_has_lock and not self.has_lock: | |
274 | if self._monitor_request_id is None: | |
275 | self.change_seqno += 1 | |
276 | else: | |
277 | # We're waiting for a monitor reply, so don't signal that the | |
278 | # database changed. The monitor reply will increment | |
279 | # change_seqno anyhow. | |
280 | pass | |
281 | self.is_lock_contended = False | |
282 | self.has_lock = new_has_lock | |
283 | ||
284 | def __do_send_lock_request(self, method): | |
285 | self.__update_has_lock(False) | |
286 | self._lock_request_id = None | |
287 | if self._session.is_connected(): | |
288 | msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name]) | |
289 | msg_id = msg.id | |
290 | self._session.send(msg) | |
291 | else: | |
292 | msg_id = None | |
293 | return msg_id | |
294 | ||
295 | def __send_lock_request(self): | |
296 | self._lock_request_id = self.__do_send_lock_request("lock") | |
297 | ||
298 | def __send_unlock_request(self): | |
299 | self.__do_send_lock_request("unlock") | |
300 | ||
301 | def __parse_lock_reply(self, result): | |
302 | self._lock_request_id = None | |
303 | got_lock = type(result) == dict and result.get("locked") is True | |
304 | self.__update_has_lock(got_lock) | |
305 | if not got_lock: | |
306 | self.is_lock_contended = True | |
307 | ||
308 | def __parse_lock_notify(self, params, new_has_lock): | |
309 | if (self.lock_name is not None | |
310 | and type(params) in (list, tuple) | |
311 | and params | |
312 | and params[0] == self.lock_name): | |
313 | self.__update_has_lock(self, new_has_lock) | |
314 | if not new_has_lock: | |
315 | self.is_lock_contended = True | |
99155935 BP |
316 | |
317 | def __send_monitor_request(self): | |
318 | monitor_requests = {} | |
8cdf0349 | 319 | for table in self.tables.itervalues(): |
99155935 BP |
320 | monitor_requests[table.name] = {"columns": table.columns.keys()} |
321 | msg = ovs.jsonrpc.Message.create_request( | |
8cdf0349 BP |
322 | "monitor", [self._db.name, None, monitor_requests]) |
323 | self._monitor_request_id = msg.id | |
324 | self._session.send(msg) | |
99155935 BP |
325 | |
326 | def __parse_update(self, update): | |
327 | try: | |
328 | self.__do_parse_update(update) | |
329 | except error.Error, e: | |
3a656eaf EJ |
330 | vlog.err("%s: error parsing update: %s" |
331 | % (self._session.get_name(), e)) | |
99155935 BP |
332 | |
333 | def __do_parse_update(self, table_updates): | |
334 | if type(table_updates) != dict: | |
335 | raise error.Error("<table-updates> is not an object", | |
336 | table_updates) | |
337 | ||
338 | for table_name, table_update in table_updates.iteritems(): | |
8cdf0349 | 339 | table = self.tables.get(table_name) |
99155935 | 340 | if not table: |
f2d8ad13 BP |
341 | raise error.Error('<table-updates> includes unknown ' |
342 | 'table "%s"' % table_name) | |
99155935 BP |
343 | |
344 | if type(table_update) != dict: | |
f2d8ad13 BP |
345 | raise error.Error('<table-update> for table "%s" is not ' |
346 | 'an object' % table_name, table_update) | |
99155935 BP |
347 | |
348 | for uuid_string, row_update in table_update.iteritems(): | |
49c541dc | 349 | if not ovs.ovsuuid.is_valid_string(uuid_string): |
f2d8ad13 BP |
350 | raise error.Error('<table-update> for table "%s" ' |
351 | 'contains bad UUID "%s" as member ' | |
352 | 'name' % (table_name, uuid_string), | |
99155935 | 353 | table_update) |
49c541dc | 354 | uuid = ovs.ovsuuid.from_string(uuid_string) |
99155935 BP |
355 | |
356 | if type(row_update) != dict: | |
f2d8ad13 BP |
357 | raise error.Error('<table-update> for table "%s" ' |
358 | 'contains <row-update> for %s that ' | |
359 | 'is not an object' | |
99155935 BP |
360 | % (table_name, uuid_string)) |
361 | ||
af1eba26 | 362 | parser = ovs.db.parser.Parser(row_update, "row-update") |
4c0f6271 BP |
363 | old = parser.get_optional("old", [dict]) |
364 | new = parser.get_optional("new", [dict]) | |
365 | parser.finish() | |
99155935 | 366 | |
99155935 | 367 | if not old and not new: |
f2d8ad13 BP |
368 | raise error.Error('<row-update> missing "old" and ' |
369 | '"new" members', row_update) | |
99155935 | 370 | |
8cdf0349 | 371 | if self.__process_update(table, uuid, old, new): |
99155935 BP |
372 | self.change_seqno += 1 |
373 | ||
8cdf0349 | 374 | def __process_update(self, table, uuid, old, new): |
99155935 | 375 | """Returns True if a column changed, False otherwise.""" |
8cdf0349 | 376 | row = table.rows.get(uuid) |
7d48f8f8 | 377 | changed = False |
99155935 BP |
378 | if not new: |
379 | # Delete row. | |
380 | if row: | |
8cdf0349 | 381 | del table.rows[uuid] |
7d48f8f8 | 382 | changed = True |
99155935 BP |
383 | else: |
384 | # XXX rate-limit | |
3a656eaf EJ |
385 | vlog.warn("cannot delete missing row %s from table %s" |
386 | % (uuid, table.name)) | |
99155935 BP |
387 | elif not old: |
388 | # Insert row. | |
389 | if not row: | |
390 | row = self.__create_row(table, uuid) | |
7d48f8f8 | 391 | changed = True |
99155935 BP |
392 | else: |
393 | # XXX rate-limit | |
3a656eaf EJ |
394 | vlog.warn("cannot add existing row %s to table %s" |
395 | % (uuid, table.name)) | |
8cdf0349 | 396 | if self.__row_update(table, row, new): |
7d48f8f8 | 397 | changed = True |
99155935 BP |
398 | else: |
399 | if not row: | |
400 | row = self.__create_row(table, uuid) | |
7d48f8f8 | 401 | changed = True |
99155935 | 402 | # XXX rate-limit |
3a656eaf EJ |
403 | vlog.warn("cannot modify missing row %s in table %s" |
404 | % (uuid, table.name)) | |
8cdf0349 | 405 | if self.__row_update(table, row, new): |
7d48f8f8 BP |
406 | changed = True |
407 | return changed | |
99155935 | 408 | |
8cdf0349 | 409 | def __row_update(self, table, row, row_json): |
99155935 BP |
410 | changed = False |
411 | for column_name, datum_json in row_json.iteritems(): | |
412 | column = table.columns.get(column_name) | |
413 | if not column: | |
414 | # XXX rate-limit | |
3a656eaf EJ |
415 | vlog.warn("unknown column %s updating table %s" |
416 | % (column_name, table.name)) | |
99155935 BP |
417 | continue |
418 | ||
419 | try: | |
420 | datum = ovs.db.data.Datum.from_json(column.type, datum_json) | |
421 | except error.Error, e: | |
422 | # XXX rate-limit | |
3a656eaf EJ |
423 | vlog.warn("error parsing column %s in table %s: %s" |
424 | % (column_name, table.name, e)) | |
99155935 BP |
425 | continue |
426 | ||
8cdf0349 BP |
427 | if datum != row._data[column_name]: |
428 | row._data[column_name] = datum | |
429 | if column.alert: | |
430 | changed = True | |
99155935 BP |
431 | else: |
432 | # Didn't really change but the OVSDB monitor protocol always | |
433 | # includes every value in a row. | |
434 | pass | |
435 | return changed | |
436 | ||
99155935 | 437 | def __create_row(self, table, uuid): |
8cdf0349 | 438 | data = {} |
99155935 | 439 | for column in table.columns.itervalues(): |
8cdf0349 BP |
440 | data[column.name] = ovs.db.data.Datum.default(column.type) |
441 | row = table.rows[uuid] = Row(self, table, uuid, data) | |
99155935 BP |
442 | return row |
443 | ||
8cdf0349 BP |
444 | def __error(self): |
445 | self._session.force_reconnect() | |
446 | ||
447 | def __txn_abort_all(self): | |
448 | while self._outstanding_txns: | |
449 | txn = self._outstanding_txns.popitem()[1] | |
4fdfe5cc | 450 | txn._status = Transaction.AGAIN_WAIT |
8cdf0349 BP |
451 | |
452 | def __txn_process_reply(self, msg): | |
453 | txn = self._outstanding_txns.pop(msg.id, None) | |
454 | if txn: | |
455 | txn._process_reply(msg) | |
456 | ||
26bb0f31 | 457 | |
8cdf0349 BP |
458 | def _uuid_to_row(atom, base): |
459 | if base.ref_table: | |
460 | return base.ref_table.rows.get(atom) | |
461 | else: | |
462 | return atom | |
463 | ||
26bb0f31 | 464 | |
8cdf0349 BP |
465 | def _row_to_uuid(value): |
466 | if type(value) == Row: | |
467 | return value.uuid | |
468 | else: | |
469 | return value | |
bf6ec045 | 470 | |
26bb0f31 | 471 | |
bf6ec045 | 472 | class Row(object): |
8cdf0349 BP |
473 | """A row within an IDL. |
474 | ||
475 | The client may access the following attributes directly: | |
476 | ||
477 | - 'uuid': a uuid.UUID object whose value is the row's database UUID. | |
478 | ||
479 | - An attribute for each column in the Row's table, named for the column, | |
480 | whose values are as returned by Datum.to_python() for the column's type. | |
481 | ||
482 | If some error occurs (e.g. the database server's idea of the column is | |
483 | different from the IDL's idea), then the attribute values is the | |
484 | "default" value return by Datum.default() for the column's type. (It is | |
485 | important to know this because the default value may violate constraints | |
486 | for the column's type, e.g. the default integer value is 0 even if column | |
487 | contraints require the column's value to be positive.) | |
488 | ||
489 | When a transaction is active, column attributes may also be assigned new | |
490 | values. Committing the transaction will then cause the new value to be | |
491 | stored into the database. | |
492 | ||
493 | *NOTE*: In the current implementation, the value of a column is a *copy* | |
494 | of the value in the database. This means that modifying its value | |
495 | directly will have no useful effect. For example, the following: | |
496 | row.mycolumn["a"] = "b" # don't do this | |
497 | will not change anything in the database, even after commit. To modify | |
498 | the column, instead assign the modified column value back to the column: | |
499 | d = row.mycolumn | |
500 | d["a"] = "b" | |
501 | row.mycolumn = d | |
502 | """ | |
503 | def __init__(self, idl, table, uuid, data): | |
504 | # All of the explicit references to self.__dict__ below are required | |
505 | # to set real attributes with invoking self.__getattr__(). | |
506 | self.__dict__["uuid"] = uuid | |
507 | ||
508 | self.__dict__["_idl"] = idl | |
509 | self.__dict__["_table"] = table | |
510 | ||
511 | # _data is the committed data. It takes the following values: | |
512 | # | |
513 | # - A dictionary that maps every column name to a Datum, if the row | |
514 | # exists in the committed form of the database. | |
515 | # | |
516 | # - None, if this row is newly inserted within the active transaction | |
517 | # and thus has no committed form. | |
518 | self.__dict__["_data"] = data | |
519 | ||
520 | # _changes describes changes to this row within the active transaction. | |
521 | # It takes the following values: | |
522 | # | |
523 | # - {}, the empty dictionary, if no transaction is active or if the | |
524 | # row has yet not been changed within this transaction. | |
525 | # | |
526 | # - A dictionary that maps a column name to its new Datum, if an | |
527 | # active transaction changes those columns' values. | |
528 | # | |
529 | # - A dictionary that maps every column name to a Datum, if the row | |
530 | # is newly inserted within the active transaction. | |
531 | # | |
532 | # - None, if this transaction deletes this row. | |
533 | self.__dict__["_changes"] = {} | |
534 | ||
535 | # A dictionary whose keys are the names of columns that must be | |
536 | # verified as prerequisites when the transaction commits. The values | |
537 | # in the dictionary are all None. | |
538 | self.__dict__["_prereqs"] = {} | |
539 | ||
540 | def __getattr__(self, column_name): | |
541 | assert self._changes is not None | |
542 | ||
543 | datum = self._changes.get(column_name) | |
544 | if datum is None: | |
545 | datum = self._data[column_name] | |
546 | ||
547 | return datum.to_python(_uuid_to_row) | |
548 | ||
549 | def __setattr__(self, column_name, value): | |
550 | assert self._changes is not None | |
551 | assert self._idl.txn | |
552 | ||
553 | column = self._table.columns[column_name] | |
554 | try: | |
555 | datum = ovs.db.data.Datum.from_python(column.type, value, | |
556 | _row_to_uuid) | |
557 | except error.Error, e: | |
558 | # XXX rate-limit | |
3a656eaf EJ |
559 | vlog.err("attempting to write bad value to column %s (%s)" |
560 | % (column_name, e)) | |
8cdf0349 BP |
561 | return |
562 | self._idl.txn._write(self, column, datum) | |
563 | ||
564 | def verify(self, column_name): | |
565 | """Causes the original contents of column 'column_name' in this row to | |
566 | be verified as a prerequisite to completing the transaction. That is, | |
567 | if 'column_name' changed in this row (or if this row was deleted) | |
568 | between the time that the IDL originally read its contents and the time | |
569 | that the transaction commits, then the transaction aborts and | |
4fdfe5cc BP |
570 | Transaction.commit() returns Transaction.AGAIN_WAIT or |
571 | Transaction.AGAIN_NOW (depending on whether the database change has | |
572 | already been received). | |
8cdf0349 BP |
573 | |
574 | The intention is that, to ensure that no transaction commits based on | |
575 | dirty reads, an application should call Row.verify() on each data item | |
576 | read as part of a read-modify-write operation. | |
577 | ||
578 | In some cases Row.verify() reduces to a no-op, because the current | |
579 | value of the column is already known: | |
580 | ||
581 | - If this row is a row created by the current transaction (returned | |
582 | by Transaction.insert()). | |
583 | ||
584 | - If the column has already been modified within the current | |
585 | transaction. | |
586 | ||
587 | Because of the latter property, always call Row.verify() *before* | |
588 | modifying the column, for a given read-modify-write. | |
589 | ||
590 | A transaction must be in progress.""" | |
591 | assert self._idl.txn | |
592 | assert self._changes is not None | |
593 | if not self._data or column_name in self._changes: | |
594 | return | |
595 | ||
596 | self._prereqs[column_name] = None | |
597 | ||
598 | def delete(self): | |
599 | """Deletes this row from its table. | |
600 | ||
601 | A transaction must be in progress.""" | |
602 | assert self._idl.txn | |
603 | assert self._changes is not None | |
604 | if self._data is None: | |
605 | del self._idl.txn._txn_rows[self.uuid] | |
606 | self.__dict__["_changes"] = None | |
607 | del self._table.rows[self.uuid] | |
608 | ||
26bb0f31 | 609 | |
8cdf0349 BP |
610 | def _uuid_name_from_uuid(uuid): |
611 | return "row%s" % str(uuid).replace("-", "_") | |
612 | ||
26bb0f31 | 613 | |
8cdf0349 BP |
614 | def _where_uuid_equals(uuid): |
615 | return [["_uuid", "==", ["uuid", str(uuid)]]] | |
616 | ||
26bb0f31 | 617 | |
8cdf0349 BP |
618 | class _InsertedRow(object): |
619 | def __init__(self, op_index): | |
620 | self.op_index = op_index | |
621 | self.real = None | |
622 | ||
26bb0f31 | 623 | |
8cdf0349 BP |
624 | class Transaction(object): |
625 | # Status values that Transaction.commit() can return. | |
26bb0f31 EJ |
626 | UNCOMMITTED = "uncommitted" # Not yet committed or aborted. |
627 | UNCHANGED = "unchanged" # Transaction didn't include any changes. | |
628 | INCOMPLETE = "incomplete" # Commit in progress, please wait. | |
629 | ABORTED = "aborted" # ovsdb_idl_txn_abort() called. | |
630 | SUCCESS = "success" # Commit successful. | |
4fdfe5cc BP |
631 | AGAIN_WAIT = "wait then try again" |
632 | # Commit failed because a "verify" operation | |
26bb0f31 | 633 | # reported an inconsistency, due to a network |
4fdfe5cc BP |
634 | # problem, or other transient failure. Wait |
635 | # for a change, then try again. | |
636 | AGAIN_NOW = "try again now" # Same as AGAIN_WAIT but try again right away. | |
26bb0f31 EJ |
637 | NOT_LOCKED = "not locked" # Server hasn't given us the lock yet. |
638 | ERROR = "error" # Commit failed due to a hard error. | |
8cdf0349 BP |
639 | |
640 | @staticmethod | |
641 | def status_to_string(status): | |
642 | """Converts one of the status values that Transaction.commit() can | |
643 | return into a human-readable string. | |
644 | ||
645 | (The status values are in fact such strings already, so | |
646 | there's nothing to do.)""" | |
647 | return status | |
648 | ||
649 | def __init__(self, idl): | |
650 | """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl). | |
651 | A given Idl may only have a single active transaction at a time. | |
652 | ||
653 | A Transaction may modify the contents of a database by assigning new | |
654 | values to columns (attributes of Row), deleting rows (with | |
655 | Row.delete()), or inserting rows (with Transaction.insert()). It may | |
656 | also check that columns in the database have not changed with | |
657 | Row.verify(). | |
658 | ||
659 | When a transaction is complete (which must be before the next call to | |
660 | Idl.run()), call Transaction.commit() or Transaction.abort().""" | |
661 | assert idl.txn is None | |
662 | ||
663 | idl.txn = self | |
664 | self._request_id = None | |
665 | self.idl = idl | |
666 | self.dry_run = False | |
667 | self._txn_rows = {} | |
668 | self._status = Transaction.UNCOMMITTED | |
669 | self._error = None | |
670 | self._comments = [] | |
4fdfe5cc | 671 | self._commit_seqno = self.idl.change_seqno |
8cdf0349 BP |
672 | |
673 | self._inc_table = None | |
674 | self._inc_column = None | |
675 | self._inc_where = None | |
676 | ||
26bb0f31 | 677 | self._inserted_rows = {} # Map from UUID to _InsertedRow |
8cdf0349 BP |
678 | |
679 | def add_comment(self, comment): | |
680 | """Appens 'comment' to the comments that will be passed to the OVSDB | |
681 | server when this transaction is committed. (The comment will be | |
682 | committed to the OVSDB log, which "ovsdb-tool show-log" can print in a | |
683 | relatively human-readable form.)""" | |
684 | self._comments.append(comment) | |
685 | ||
686 | def increment(self, table, column, where): | |
687 | assert not self._inc_table | |
688 | self._inc_table = table | |
689 | self._inc_column = column | |
690 | self._inc_where = where | |
691 | ||
692 | def wait(self, poller): | |
693 | if self._status not in (Transaction.UNCOMMITTED, | |
694 | Transaction.INCOMPLETE): | |
695 | poller.immediate_wake() | |
696 | ||
697 | def _substitute_uuids(self, json): | |
698 | if type(json) in (list, tuple): | |
699 | if (len(json) == 2 | |
700 | and json[0] == 'uuid' | |
701 | and ovs.ovsuuid.is_valid_string(json[1])): | |
702 | uuid = ovs.ovsuuid.from_string(json[1]) | |
703 | row = self._txn_rows.get(uuid, None) | |
704 | if row and row._data is None: | |
705 | return ["named-uuid", _uuid_name_from_uuid(uuid)] | |
706 | return json | |
707 | ||
708 | def __disassemble(self): | |
709 | self.idl.txn = None | |
710 | ||
711 | for row in self._txn_rows.itervalues(): | |
712 | if row._changes is None: | |
713 | row._table.rows[row.uuid] = row | |
714 | elif row._data is None: | |
715 | del row._table.rows[row.uuid] | |
716 | row.__dict__["_changes"] = {} | |
717 | row.__dict__["_prereqs"] = {} | |
718 | self._txn_rows = {} | |
719 | ||
720 | def commit(self): | |
721 | """Attempts to commit this transaction and returns the status of the | |
722 | commit operation, one of the constants declared as class attributes. | |
723 | If the return value is Transaction.INCOMPLETE, then the transaction is | |
724 | not yet complete and the caller should try calling again later, after | |
725 | calling Idl.run() to run the Idl. | |
726 | ||
727 | Committing a transaction rolls back all of the changes that it made to | |
728 | the Idl's copy of the database. If the transaction commits | |
729 | successfully, then the database server will send an update and, thus, | |
730 | the Idl will be updated with the committed changes.""" | |
731 | # The status can only change if we're the active transaction. | |
732 | # (Otherwise, our status will change only in Idl.run().) | |
733 | if self != self.idl.txn: | |
734 | return self._status | |
735 | ||
736 | # If we need a lock but don't have it, give up quickly. | |
737 | if self.idl.lock_name and not self.idl.has_lock(): | |
738 | self._status = Transaction.NOT_LOCKED | |
739 | self.__disassemble() | |
740 | return self._status | |
741 | ||
742 | operations = [self.idl._db.name] | |
743 | ||
744 | # Assert that we have the required lock (avoiding a race). | |
745 | if self.idl.lock_name: | |
746 | operations.append({"op": "assert", | |
747 | "lock": self.idl.lock_name}) | |
748 | ||
749 | # Add prerequisites and declarations of new rows. | |
750 | for row in self._txn_rows.itervalues(): | |
751 | if row._prereqs: | |
752 | rows = {} | |
753 | columns = [] | |
754 | for column_name in row._prereqs: | |
755 | columns.append(column_name) | |
756 | rows[column_name] = row._data[column_name].to_json() | |
757 | operations.append({"op": "wait", | |
758 | "table": row._table.name, | |
759 | "timeout": 0, | |
760 | "where": _where_uuid_equals(row.uuid), | |
761 | "until": "==", | |
762 | "columns": columns, | |
763 | "rows": [rows]}) | |
764 | ||
765 | # Add updates. | |
766 | any_updates = False | |
767 | for row in self._txn_rows.itervalues(): | |
768 | if row._changes is None: | |
769 | if row._table.is_root: | |
770 | operations.append({"op": "delete", | |
771 | "table": row._table.name, | |
772 | "where": _where_uuid_equals(row.uuid)}) | |
773 | any_updates = True | |
774 | else: | |
775 | # Let ovsdb-server decide whether to really delete it. | |
776 | pass | |
777 | elif row._changes: | |
778 | op = {"table": row._table.name} | |
779 | if row._data is None: | |
780 | op["op"] = "insert" | |
781 | op["uuid-name"] = _uuid_name_from_uuid(row.uuid) | |
782 | any_updates = True | |
783 | ||
784 | op_index = len(operations) - 1 | |
785 | self._inserted_rows[row.uuid] = _InsertedRow(op_index) | |
786 | else: | |
787 | op["op"] = "update" | |
788 | op["where"] = _where_uuid_equals(row.uuid) | |
789 | ||
790 | row_json = {} | |
791 | op["row"] = row_json | |
792 | ||
793 | for column_name, datum in row._changes.iteritems(): | |
794 | if row._data is not None or not datum.is_default(): | |
26bb0f31 EJ |
795 | row_json[column_name] = ( |
796 | self._substitute_uuids(datum.to_json())) | |
8cdf0349 BP |
797 | |
798 | # If anything really changed, consider it an update. | |
799 | # We can't suppress not-really-changed values earlier | |
800 | # or transactions would become nonatomic (see the big | |
801 | # comment inside Transaction._write()). | |
802 | if (not any_updates and row._data is not None and | |
803 | row._data[column_name] != datum): | |
804 | any_updates = True | |
805 | ||
806 | if row._data is None or row_json: | |
807 | operations.append(op) | |
808 | ||
809 | # Add increment. | |
810 | if self._inc_table and any_updates: | |
811 | self._inc_index = len(operations) - 1 | |
812 | ||
813 | operations.append({"op": "mutate", | |
814 | "table": self._inc_table, | |
26bb0f31 EJ |
815 | "where": self._substitute_uuids( |
816 | self._inc_where), | |
8cdf0349 BP |
817 | "mutations": [[self._inc_column, "+=", 1]]}) |
818 | operations.append({"op": "select", | |
819 | "table": self._inc_table, | |
26bb0f31 EJ |
820 | "where": self._substitute_uuids( |
821 | self._inc_where), | |
8cdf0349 BP |
822 | "columns": [self._inc_column]}) |
823 | ||
824 | # Add comment. | |
825 | if self._comments: | |
826 | operations.append({"op": "comment", | |
827 | "comment": "\n".join(self._comments)}) | |
828 | ||
829 | # Dry run? | |
830 | if self.dry_run: | |
831 | operations.append({"op": "abort"}) | |
832 | ||
833 | if not any_updates: | |
834 | self._status = Transaction.UNCHANGED | |
835 | else: | |
836 | msg = ovs.jsonrpc.Message.create_request("transact", operations) | |
837 | self._request_id = msg.id | |
838 | if not self.idl._session.send(msg): | |
839 | self.idl._outstanding_txns[self._request_id] = self | |
840 | self._status = Transaction.INCOMPLETE | |
841 | else: | |
4fdfe5cc | 842 | self._status = Transaction.AGAIN_WAIT |
8cdf0349 BP |
843 | |
844 | self.__disassemble() | |
845 | return self._status | |
846 | ||
847 | def commit_block(self): | |
848 | while True: | |
849 | status = self.commit() | |
850 | if status != Transaction.INCOMPLETE: | |
851 | return status | |
852 | ||
853 | self.idl.run() | |
854 | ||
855 | poller = ovs.poller.Poller() | |
856 | self.idl.wait(poller) | |
857 | self.wait(poller) | |
858 | poller.block() | |
859 | ||
860 | def get_increment_new_value(self): | |
861 | assert self._status == Transaction.SUCCESS | |
862 | return self._inc_new_value | |
863 | ||
864 | def abort(self): | |
865 | """Aborts this transaction. If Transaction.commit() has already been | |
866 | called then the transaction might get committed anyhow.""" | |
867 | self.__disassemble() | |
868 | if self._status in (Transaction.UNCOMMITTED, | |
869 | Transaction.INCOMPLETE): | |
870 | self._status = Transaction.ABORTED | |
871 | ||
872 | def get_error(self): | |
873 | """Returns a string representing this transaction's current status, | |
874 | suitable for use in log messages.""" | |
875 | if self._status != Transaction.ERROR: | |
876 | return Transaction.status_to_string(self._status) | |
877 | elif self._error: | |
878 | return self._error | |
879 | else: | |
880 | return "no error details available" | |
881 | ||
882 | def __set_error_json(self, json): | |
883 | if self._error is None: | |
884 | self._error = ovs.json.to_string(json) | |
885 | ||
886 | def get_insert_uuid(self, uuid): | |
887 | """Finds and returns the permanent UUID that the database assigned to a | |
888 | newly inserted row, given the UUID that Transaction.insert() assigned | |
889 | locally to that row. | |
890 | ||
891 | Returns None if 'uuid' is not a UUID assigned by Transaction.insert() | |
892 | or if it was assigned by that function and then deleted by Row.delete() | |
893 | within the same transaction. (Rows that are inserted and then deleted | |
894 | within a single transaction are never sent to the database server, so | |
895 | it never assigns them a permanent UUID.) | |
896 | ||
897 | This transaction must have completed successfully.""" | |
898 | assert self._status in (Transaction.SUCCESS, | |
899 | Transaction.UNCHANGED) | |
900 | inserted_row = self._inserted_rows.get(uuid) | |
901 | if inserted_row: | |
902 | return inserted_row.real | |
903 | return None | |
904 | ||
905 | def _write(self, row, column, datum): | |
906 | assert row._changes is not None | |
907 | ||
908 | txn = row._idl.txn | |
909 | ||
910 | # If this is a write-only column and the datum being written is the | |
911 | # same as the one already there, just skip the update entirely. This | |
912 | # is worth optimizing because we have a lot of columns that get | |
913 | # periodically refreshed into the database but don't actually change | |
914 | # that often. | |
915 | # | |
916 | # We don't do this for read/write columns because that would break | |
917 | # atomicity of transactions--some other client might have written a | |
918 | # different value in that column since we read it. (But if a whole | |
919 | # transaction only does writes of existing values, without making any | |
920 | # real changes, we will drop the whole transaction later in | |
921 | # ovsdb_idl_txn_commit().) | |
922 | if not column.alert and row._data.get(column.name) == datum: | |
923 | new_value = row._changes.get(column.name) | |
924 | if new_value is None or new_value == datum: | |
925 | return | |
926 | ||
927 | txn._txn_rows[row.uuid] = row | |
928 | row._changes[column.name] = datum.copy() | |
929 | ||
930 | def insert(self, table, new_uuid=None): | |
931 | """Inserts and returns a new row in 'table', which must be one of the | |
932 | ovs.db.schema.TableSchema objects in the Idl's 'tables' dict. | |
933 | ||
934 | The new row is assigned a provisional UUID. If 'uuid' is None then one | |
935 | is randomly generated; otherwise 'uuid' should specify a randomly | |
936 | generated uuid.UUID not otherwise in use. ovsdb-server will assign a | |
937 | different UUID when 'txn' is committed, but the IDL will replace any | |
938 | uses of the provisional UUID in the data to be to be committed by the | |
939 | UUID assigned by ovsdb-server.""" | |
940 | assert self._status == Transaction.UNCOMMITTED | |
941 | if new_uuid is None: | |
942 | new_uuid = uuid.uuid4() | |
943 | row = Row(self.idl, table, new_uuid, None) | |
944 | table.rows[row.uuid] = row | |
945 | self._txn_rows[row.uuid] = row | |
946 | return row | |
947 | ||
948 | def _process_reply(self, msg): | |
949 | if msg.type == ovs.jsonrpc.Message.T_ERROR: | |
950 | self._status = Transaction.ERROR | |
951 | elif type(msg.result) not in (list, tuple): | |
952 | # XXX rate-limit | |
3a656eaf | 953 | vlog.warn('reply to "transact" is not JSON array') |
8cdf0349 BP |
954 | else: |
955 | hard_errors = False | |
956 | soft_errors = False | |
957 | lock_errors = False | |
958 | ||
959 | ops = msg.result | |
960 | for op in ops: | |
961 | if op is None: | |
962 | # This isn't an error in itself but indicates that some | |
963 | # prior operation failed, so make sure that we know about | |
964 | # it. | |
965 | soft_errors = True | |
966 | elif type(op) == dict: | |
967 | error = op.get("error") | |
968 | if error is not None: | |
969 | if error == "timed out": | |
970 | soft_errors = True | |
971 | elif error == "not owner": | |
972 | lock_errors = True | |
973 | elif error == "aborted": | |
974 | pass | |
975 | else: | |
976 | hard_errors = True | |
977 | self.__set_error_json(op) | |
978 | else: | |
979 | hard_errors = True | |
980 | self.__set_error_json(op) | |
981 | # XXX rate-limit | |
3a656eaf | 982 | vlog.warn("operation reply is not JSON null or object") |
8cdf0349 BP |
983 | |
984 | if not soft_errors and not hard_errors and not lock_errors: | |
985 | if self._inc_table and not self.__process_inc_reply(ops): | |
986 | hard_errors = True | |
987 | ||
988 | for insert in self._inserted_rows.itervalues(): | |
989 | if not self.__process_insert_reply(insert, ops): | |
990 | hard_errors = True | |
991 | ||
992 | if hard_errors: | |
993 | self._status = Transaction.ERROR | |
994 | elif lock_errors: | |
995 | self._status = Transaction.NOT_LOCKED | |
996 | elif soft_errors: | |
4fdfe5cc BP |
997 | if self._commit_seqno == self.idl.change_seqno: |
998 | self._status = Transaction.AGAIN_WAIT | |
999 | else: | |
1000 | self._status = Transaction.AGAIN_NOW | |
8cdf0349 BP |
1001 | else: |
1002 | self._status = Transaction.SUCCESS | |
1003 | ||
1004 | @staticmethod | |
1005 | def __check_json_type(json, types, name): | |
1006 | if not json: | |
1007 | # XXX rate-limit | |
3a656eaf | 1008 | vlog.warn("%s is missing" % name) |
8cdf0349 BP |
1009 | return False |
1010 | elif type(json) not in types: | |
1011 | # XXX rate-limit | |
3a656eaf | 1012 | vlog.warn("%s has unexpected type %s" % (name, type(json))) |
8cdf0349 BP |
1013 | return False |
1014 | else: | |
1015 | return True | |
1016 | ||
1017 | def __process_inc_reply(self, ops): | |
1018 | if self._inc_index + 2 > len(ops): | |
1019 | # XXX rate-limit | |
3a656eaf EJ |
1020 | vlog.warn("reply does not contain enough operations for " |
1021 | "increment (has %d, needs %d)" % | |
1022 | (len(ops), self._inc_index + 2)) | |
8cdf0349 BP |
1023 | |
1024 | # We know that this is a JSON object because the loop in | |
1025 | # __process_reply() already checked. | |
1026 | mutate = ops[self._inc_index] | |
1027 | count = mutate.get("count") | |
1028 | if not Transaction.__check_json_type(count, (int, long), | |
1029 | '"mutate" reply "count"'): | |
1030 | return False | |
1031 | if count != 1: | |
1032 | # XXX rate-limit | |
3a656eaf | 1033 | vlog.warn('"mutate" reply "count" is %d instead of 1' % count) |
8cdf0349 BP |
1034 | return False |
1035 | ||
1036 | select = ops[self._inc_index + 1] | |
1037 | rows = select.get("rows") | |
1038 | if not Transaction.__check_json_type(rows, (list, tuple), | |
1039 | '"select" reply "rows"'): | |
1040 | return False | |
1041 | if len(rows) != 1: | |
1042 | # XXX rate-limit | |
3a656eaf EJ |
1043 | vlog.warn('"select" reply "rows" has %d elements ' |
1044 | 'instead of 1' % len(rows)) | |
8cdf0349 BP |
1045 | return False |
1046 | row = rows[0] | |
1047 | if not Transaction.__check_json_type(row, (dict,), | |
1048 | '"select" reply row'): | |
1049 | return False | |
1050 | column = row.get(self._inc_column) | |
1051 | if not Transaction.__check_json_type(column, (int, long), | |
1052 | '"select" reply inc column'): | |
1053 | return False | |
1054 | self._inc_new_value = column | |
1055 | return True | |
1056 | ||
1057 | def __process_insert_reply(self, insert, ops): | |
1058 | if insert.op_index >= len(ops): | |
1059 | # XXX rate-limit | |
3a656eaf EJ |
1060 | vlog.warn("reply does not contain enough operations " |
1061 | "for insert (has %d, needs %d)" | |
1062 | % (len(ops), insert.op_index)) | |
8cdf0349 BP |
1063 | return False |
1064 | ||
1065 | # We know that this is a JSON object because the loop in | |
1066 | # __process_reply() already checked. | |
1067 | reply = ops[insert.op_index] | |
1068 | json_uuid = reply.get("uuid") | |
1069 | if not Transaction.__check_json_type(json_uuid, (tuple, list), | |
1070 | '"insert" reply "uuid"'): | |
1071 | return False | |
1072 | ||
1073 | try: | |
1074 | uuid_ = ovs.ovsuuid.from_json(json_uuid) | |
1075 | except error.Error: | |
1076 | # XXX rate-limit | |
3a656eaf | 1077 | vlog.warn('"insert" reply "uuid" is not a JSON UUID') |
8cdf0349 | 1078 | return False |
bf6ec045 | 1079 | |
8cdf0349 BP |
1080 | insert.real = uuid_ |
1081 | return True | |
ad0991e6 EJ |
1082 | |
1083 | ||
1084 | class SchemaHelper(object): | |
1085 | """IDL Schema helper. | |
1086 | ||
1087 | This class encapsulates the logic required to generate schemas suitable | |
1088 | for creating 'ovs.db.idl.Idl' objects. Clients should register columns | |
1089 | they are interested in using register_columns(). When finished, the | |
1090 | get_idl_schema() function may be called. | |
1091 | ||
1092 | The location on disk of the schema used may be found in the | |
1093 | 'schema_location' variable.""" | |
1094 | ||
1095 | def __init__(self, location=None): | |
1096 | """Creates a new Schema object.""" | |
1097 | ||
1098 | if location is None: | |
1099 | location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR | |
1100 | ||
1101 | self.schema_location = location | |
1102 | self._tables = {} | |
1103 | ||
1104 | def register_columns(self, table, columns): | |
1105 | """Registers interest in the given 'columns' of 'table'. Future calls | |
1106 | to get_idl_schema() will include 'table':column for each column in | |
1107 | 'columns'. This function automatically avoids adding duplicate entries | |
1108 | to the schema. | |
1109 | ||
1110 | 'table' must be a string. | |
1111 | 'columns' must be a list of strings. | |
1112 | """ | |
1113 | ||
1114 | assert type(table) is str | |
1115 | assert type(columns) is list | |
1116 | ||
1117 | columns = set(columns) | self._tables.get(table, set()) | |
1118 | self._tables[table] = columns | |
1119 | ||
1120 | def get_idl_schema(self): | |
1121 | """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL' | |
1122 | object based on columns registered using the register_columns() | |
1123 | function.""" | |
1124 | ||
1125 | schema = ovs.db.schema.DbSchema.from_json( | |
1126 | ovs.json.from_file(self.schema_location)) | |
1127 | schema_tables = {} | |
1128 | for table, columns in self._tables.iteritems(): | |
1129 | schema_tables[table] = ( | |
1130 | self._keep_table_columns(schema, table, columns)) | |
1131 | ||
1132 | schema.tables = schema_tables | |
1133 | return schema | |
1134 | ||
1135 | def _keep_table_columns(self, schema, table_name, columns): | |
1136 | assert table_name in schema.tables | |
1137 | table = schema.tables[table_name] | |
1138 | ||
1139 | new_columns = {} | |
1140 | for column_name in columns: | |
1141 | assert type(column_name) is str | |
1142 | assert column_name in table.columns | |
1143 | ||
1144 | new_columns[column_name] = table.columns[column_name] | |
1145 | ||
1146 | table.columns = new_columns | |
1147 | return table |