]>
Commit | Line | Data |
---|---|---|
0164e367 | 1 | # Copyright (c) 2009, 2010, 2011, 2012, 2013, 2016 Nicira, Inc. |
99155935 BP |
2 | # |
3 | # Licensed under the Apache License, Version 2.0 (the "License"); | |
4 | # you may not use this file except in compliance with the License. | |
5 | # You may obtain a copy of the License at: | |
6 | # | |
7 | # http://www.apache.org/licenses/LICENSE-2.0 | |
8 | # | |
9 | # Unless required by applicable law or agreed to in writing, software | |
10 | # distributed under the License is distributed on an "AS IS" BASIS, | |
11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 | # See the License for the specific language governing permissions and | |
13 | # limitations under the License. | |
14 | ||
fbafc3c2 | 15 | import functools |
8cdf0349 | 16 | import uuid |
99155935 | 17 | |
cb96c1b2 RB |
18 | import six |
19 | ||
99155935 | 20 | import ovs.jsonrpc |
a59912a0 | 21 | import ovs.db.data as data |
4c0f6271 | 22 | import ovs.db.parser |
99155935 BP |
23 | import ovs.db.schema |
24 | from ovs.db import error | |
25 | import ovs.ovsuuid | |
8cdf0349 | 26 | import ovs.poller |
3a656eaf EJ |
27 | import ovs.vlog |
28 | ||
29 | vlog = ovs.vlog.Vlog("idl") | |
99155935 | 30 | |
26bb0f31 EJ |
31 | __pychecker__ = 'no-classattr no-objattrs' |
32 | ||
d7d417fc TW |
33 | ROW_CREATE = "create" |
34 | ROW_UPDATE = "update" | |
35 | ROW_DELETE = "delete" | |
26bb0f31 | 36 | |
897c8064 LS |
37 | OVSDB_UPDATE = 0 |
38 | OVSDB_UPDATE2 = 1 | |
39 | ||
d7d417fc TW |
40 | |
41 | class Idl(object): | |
99155935 BP |
42 | """Open vSwitch Database Interface Definition Language (OVSDB IDL). |
43 | ||
44 | The OVSDB IDL maintains an in-memory replica of a database. It issues RPC | |
45 | requests to an OVSDB database server and parses the responses, converting | |
46 | raw JSON into data structures that are easier for clients to digest. | |
47 | ||
48 | The IDL also assists with issuing database transactions. The client | |
49 | creates a transaction, manipulates the IDL data structures, and commits or | |
50 | aborts the transaction. The IDL then composes and issues the necessary | |
51 | JSON-RPC requests and reports to the client whether the transaction | |
52 | completed successfully. | |
53 | ||
8cdf0349 BP |
54 | The client is allowed to access the following attributes directly, in a |
55 | read-only fashion: | |
99155935 | 56 | |
8cdf0349 BP |
57 | - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided |
58 | to the Idl constructor. Each ovs.db.schema.TableSchema in the map is | |
59 | annotated with a new attribute 'rows', which is a dict from a uuid.UUID | |
60 | to a Row object. | |
61 | ||
62 | The client may directly read and write the Row objects referenced by the | |
63 | 'rows' map values. Refer to Row for more details. | |
64 | ||
65 | - 'change_seqno': A number that represents the IDL's state. When the IDL | |
2f926787 BP |
66 | is updated (by Idl.run()), its value changes. The sequence number can |
67 | occasionally change even if the database does not. This happens if the | |
68 | connection to the database drops and reconnects, which causes the | |
69 | database contents to be reloaded even if they didn't change. (It could | |
70 | also happen if the database server sends out a "change" that reflects | |
71 | what the IDL already thought was in the database. The database server is | |
72 | not supposed to do that, but bugs could in theory cause it to do so.) | |
8cdf0349 BP |
73 | |
74 | - 'lock_name': The name of the lock configured with Idl.set_lock(), or None | |
75 | if no lock is configured. | |
76 | ||
77 | - 'has_lock': True, if the IDL is configured to obtain a lock and owns that | |
78 | lock, and False otherwise. | |
79 | ||
80 | Locking and unlocking happens asynchronously from the database client's | |
81 | point of view, so the information is only useful for optimization | |
82 | (e.g. if the client doesn't have the lock then there's no point in trying | |
83 | to write to the database). | |
84 | ||
85 | - 'is_lock_contended': True, if the IDL is configured to obtain a lock but | |
86 | the database server has indicated that some other client already owns the | |
87 | requested lock, and False otherwise. | |
88 | ||
89 | - 'txn': The ovs.db.idl.Transaction object for the database transaction | |
90 | currently being constructed, if there is one, or None otherwise. | |
91 | """ | |
92 | ||
897c8064 LS |
93 | IDL_S_INITIAL = 0 |
94 | IDL_S_MONITOR_REQUESTED = 1 | |
95 | IDL_S_MONITOR_COND_REQUESTED = 2 | |
96 | ||
8cdf0349 | 97 | def __init__(self, remote, schema): |
99155935 BP |
98 | """Creates and returns a connection to the database named 'db_name' on |
99 | 'remote', which should be in a form acceptable to | |
100 | ovs.jsonrpc.session.open(). The connection will maintain an in-memory | |
8cdf0349 BP |
101 | replica of the remote database. |
102 | ||
103 | 'schema' should be the schema for the remote database. The caller may | |
104 | have cut it down by removing tables or columns that are not of | |
105 | interest. The IDL will only replicate the tables and columns that | |
106 | remain. The caller may also add a attribute named 'alert' to selected | |
107 | remaining columns, setting its value to False; if so, then changes to | |
108 | those columns will not be considered changes to the database for the | |
109 | purpose of the return value of Idl.run() and Idl.change_seqno. This is | |
110 | useful for columns that the IDL's client will write but not read. | |
111 | ||
ad0991e6 EJ |
112 | As a convenience to users, 'schema' may also be an instance of the |
113 | SchemaHelper class. | |
114 | ||
8cdf0349 BP |
115 | The IDL uses and modifies 'schema' directly.""" |
116 | ||
bf42f674 EJ |
117 | assert isinstance(schema, SchemaHelper) |
118 | schema = schema.get_idl_schema() | |
ad0991e6 | 119 | |
8cdf0349 | 120 | self.tables = schema.tables |
80c12152 | 121 | self.readonly = schema.readonly |
8cdf0349 BP |
122 | self._db = schema |
123 | self._session = ovs.jsonrpc.Session.open(remote) | |
124 | self._monitor_request_id = None | |
125 | self._last_seqno = None | |
99155935 | 126 | self.change_seqno = 0 |
897c8064 LS |
127 | self.uuid = uuid.uuid1() |
128 | self.state = self.IDL_S_INITIAL | |
8cdf0349 BP |
129 | |
130 | # Database locking. | |
131 | self.lock_name = None # Name of lock we need, None if none. | |
132 | self.has_lock = False # Has db server said we have the lock? | |
26bb0f31 | 133 | self.is_lock_contended = False # Has db server said we can't get lock? |
8cdf0349 BP |
134 | self._lock_request_id = None # JSON-RPC ID of in-flight lock request. |
135 | ||
136 | # Transaction support. | |
137 | self.txn = None | |
138 | self._outstanding_txns = {} | |
139 | ||
cb96c1b2 RB |
140 | for table in six.itervalues(schema.tables): |
141 | for column in six.itervalues(table.columns): | |
8cdf0349 BP |
142 | if not hasattr(column, 'alert'): |
143 | column.alert = True | |
144 | table.need_table = False | |
145 | table.rows = {} | |
146 | table.idl = self | |
0164e367 | 147 | table.condition = [True] |
16ebb90e | 148 | table.cond_changed = False |
99155935 BP |
149 | |
150 | def close(self): | |
8cdf0349 BP |
151 | """Closes the connection to the database. The IDL will no longer |
152 | update.""" | |
153 | self._session.close() | |
99155935 BP |
154 | |
155 | def run(self): | |
156 | """Processes a batch of messages from the database server. Returns | |
157 | True if the database as seen through the IDL changed, False if it did | |
158 | not change. The initial fetch of the entire contents of the remote | |
8cdf0349 BP |
159 | database is considered to be one kind of change. If the IDL has been |
160 | configured to acquire a database lock (with Idl.set_lock()), then | |
161 | successfully acquiring the lock is also considered to be a change. | |
99155935 BP |
162 | |
163 | This function can return occasional false positives, that is, report | |
164 | that the database changed even though it didn't. This happens if the | |
165 | connection to the database drops and reconnects, which causes the | |
166 | database contents to be reloaded even if they didn't change. (It could | |
167 | also happen if the database server sends out a "change" that reflects | |
168 | what we already thought was in the database, but the database server is | |
169 | not supposed to do that.) | |
170 | ||
171 | As an alternative to checking the return value, the client may check | |
8cdf0349 BP |
172 | for changes in self.change_seqno.""" |
173 | assert not self.txn | |
99155935 | 174 | initial_change_seqno = self.change_seqno |
16ebb90e LS |
175 | |
176 | self.send_cond_change() | |
8cdf0349 BP |
177 | self._session.run() |
178 | i = 0 | |
179 | while i < 50: | |
180 | i += 1 | |
181 | if not self._session.is_connected(): | |
182 | break | |
183 | ||
184 | seqno = self._session.get_seqno() | |
185 | if seqno != self._last_seqno: | |
186 | self._last_seqno = seqno | |
187 | self.__txn_abort_all() | |
188 | self.__send_monitor_request() | |
189 | if self.lock_name: | |
190 | self.__send_lock_request() | |
191 | break | |
192 | ||
193 | msg = self._session.recv() | |
194 | if msg is None: | |
195 | break | |
196 | if (msg.type == ovs.jsonrpc.Message.T_NOTIFY | |
897c8064 LS |
197 | and msg.method == "update2" |
198 | and len(msg.params) == 2): | |
199 | # Database contents changed. | |
200 | self.__parse_update(msg.params[1], OVSDB_UPDATE2) | |
201 | elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY | |
202 | and msg.method == "update" | |
203 | and len(msg.params) == 2): | |
8cdf0349 | 204 | # Database contents changed. |
897c8064 | 205 | self.__parse_update(msg.params[1], OVSDB_UPDATE) |
8cdf0349 BP |
206 | elif (msg.type == ovs.jsonrpc.Message.T_REPLY |
207 | and self._monitor_request_id is not None | |
208 | and self._monitor_request_id == msg.id): | |
209 | # Reply to our "monitor" request. | |
210 | try: | |
211 | self.change_seqno += 1 | |
212 | self._monitor_request_id = None | |
213 | self.__clear() | |
897c8064 LS |
214 | if self.state == self.IDL_S_MONITOR_COND_REQUESTED: |
215 | self.__parse_update(msg.result, OVSDB_UPDATE2) | |
216 | else: | |
217 | assert self.state == self.IDL_S_MONITOR_REQUESTED | |
218 | self.__parse_update(msg.result, OVSDB_UPDATE) | |
219 | ||
3ab76c56 | 220 | except error.Error as e: |
3a656eaf | 221 | vlog.err("%s: parse error in received schema: %s" |
897c8064 | 222 | % (self._session.get_name(), e)) |
8cdf0349 BP |
223 | self.__error() |
224 | elif (msg.type == ovs.jsonrpc.Message.T_REPLY | |
225 | and self._lock_request_id is not None | |
226 | and self._lock_request_id == msg.id): | |
227 | # Reply to our "lock" request. | |
228 | self.__parse_lock_reply(msg.result) | |
229 | elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY | |
230 | and msg.method == "locked"): | |
231 | # We got our lock. | |
232 | self.__parse_lock_notify(msg.params, True) | |
233 | elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY | |
234 | and msg.method == "stolen"): | |
235 | # Someone else stole our lock. | |
236 | self.__parse_lock_notify(msg.params, False) | |
237 | elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo": | |
238 | # Reply to our echo request. Ignore it. | |
239 | pass | |
897c8064 LS |
240 | elif (msg.type == ovs.jsonrpc.Message.T_ERROR and |
241 | self.state == self.IDL_S_MONITOR_COND_REQUESTED and | |
242 | self._monitor_request_id == msg.id): | |
243 | if msg.error == "unknown method": | |
244 | self.__send_monitor_request() | |
8cdf0349 BP |
245 | elif (msg.type in (ovs.jsonrpc.Message.T_ERROR, |
246 | ovs.jsonrpc.Message.T_REPLY) | |
247 | and self.__txn_process_reply(msg)): | |
248 | # __txn_process_reply() did everything needed. | |
249 | pass | |
250 | else: | |
251 | # This can happen if a transaction is destroyed before we | |
252 | # receive the reply, so keep the log level low. | |
3a656eaf EJ |
253 | vlog.dbg("%s: received unexpected %s message" |
254 | % (self._session.get_name(), | |
255 | ovs.jsonrpc.Message.type_to_string(msg.type))) | |
8cdf0349 | 256 | |
99155935 BP |
257 | return initial_change_seqno != self.change_seqno |
258 | ||
16ebb90e LS |
259 | def send_cond_change(self): |
260 | if not self._session.is_connected(): | |
261 | return | |
262 | ||
263 | for table in six.itervalues(self.tables): | |
264 | if table.cond_changed: | |
265 | self.__send_cond_change(table, table.condition) | |
266 | table.cond_changed = False | |
267 | ||
0164e367 BP |
268 | def cond_change(self, table_name, cond): |
269 | """Sets the condition for 'table_name' to 'cond', which should be a | |
270 | conditional expression suitable for use directly in the OVSDB | |
271 | protocol, with the exception that the empty condition [] | |
272 | matches no rows (instead of matching every row). That is, [] | |
273 | is equivalent to [False], not to [True]. | |
274 | """ | |
16ebb90e | 275 | |
897c8064 LS |
276 | table = self.tables.get(table_name) |
277 | if not table: | |
278 | raise error.Error('Unknown table "%s"' % table_name) | |
16ebb90e | 279 | |
0164e367 BP |
280 | if cond == []: |
281 | cond = [False] | |
282 | if table.condition != cond: | |
283 | table.condition = cond | |
284 | table.cond_changed = True | |
897c8064 | 285 | |
99155935 BP |
286 | def wait(self, poller): |
287 | """Arranges for poller.block() to wake up when self.run() has something | |
288 | to do or when activity occurs on a transaction on 'self'.""" | |
8cdf0349 BP |
289 | self._session.wait(poller) |
290 | self._session.recv_wait(poller) | |
99155935 | 291 | |
8cdf0349 BP |
292 | def has_ever_connected(self): |
293 | """Returns True, if the IDL successfully connected to the remote | |
294 | database and retrieved its contents (even if the connection | |
295 | subsequently dropped and is in the process of reconnecting). If so, | |
296 | then the IDL contains an atomic snapshot of the database's contents | |
297 | (but it might be arbitrarily old if the connection dropped). | |
298 | ||
299 | Returns False if the IDL has never connected or retrieved the | |
300 | database's contents. If so, the IDL is empty.""" | |
301 | return self.change_seqno != 0 | |
302 | ||
303 | def force_reconnect(self): | |
304 | """Forces the IDL to drop its connection to the database and reconnect. | |
305 | In the meantime, the contents of the IDL will not change.""" | |
306 | self._session.force_reconnect() | |
307 | ||
308 | def set_lock(self, lock_name): | |
309 | """If 'lock_name' is not None, configures the IDL to obtain the named | |
310 | lock from the database server and to avoid modifying the database when | |
311 | the lock cannot be acquired (that is, when another client has the same | |
312 | lock). | |
313 | ||
314 | If 'lock_name' is None, drops the locking requirement and releases the | |
315 | lock.""" | |
316 | assert not self.txn | |
317 | assert not self._outstanding_txns | |
318 | ||
319 | if self.lock_name and (not lock_name or lock_name != self.lock_name): | |
320 | # Release previous lock. | |
321 | self.__send_unlock_request() | |
322 | self.lock_name = None | |
323 | self.is_lock_contended = False | |
324 | ||
325 | if lock_name and not self.lock_name: | |
326 | # Acquire new lock. | |
327 | self.lock_name = lock_name | |
328 | self.__send_lock_request() | |
329 | ||
d7d417fc TW |
330 | def notify(self, event, row, updates=None): |
331 | """Hook for implementing create/update/delete notifications | |
332 | ||
333 | :param event: The event that was triggered | |
334 | :type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE | |
335 | :param row: The row as it is after the operation has occured | |
336 | :type row: Row | |
a7261bf7 NS |
337 | :param updates: For updates, row with only old values of the changed |
338 | columns | |
d7d417fc TW |
339 | :type updates: Row |
340 | """ | |
341 | ||
897c8064 LS |
342 | def __send_cond_change(self, table, cond): |
343 | monitor_cond_change = {table.name: [{"where": cond}]} | |
344 | old_uuid = str(self.uuid) | |
345 | self.uuid = uuid.uuid1() | |
346 | params = [old_uuid, str(self.uuid), monitor_cond_change] | |
347 | msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params) | |
348 | self._session.send(msg) | |
349 | ||
8cdf0349 BP |
350 | def __clear(self): |
351 | changed = False | |
352 | ||
cb96c1b2 | 353 | for table in six.itervalues(self.tables): |
8cdf0349 BP |
354 | if table.rows: |
355 | changed = True | |
356 | table.rows = {} | |
99155935 | 357 | |
8cdf0349 BP |
358 | if changed: |
359 | self.change_seqno += 1 | |
360 | ||
361 | def __update_has_lock(self, new_has_lock): | |
362 | if new_has_lock and not self.has_lock: | |
363 | if self._monitor_request_id is None: | |
364 | self.change_seqno += 1 | |
365 | else: | |
366 | # We're waiting for a monitor reply, so don't signal that the | |
367 | # database changed. The monitor reply will increment | |
368 | # change_seqno anyhow. | |
369 | pass | |
370 | self.is_lock_contended = False | |
371 | self.has_lock = new_has_lock | |
372 | ||
373 | def __do_send_lock_request(self, method): | |
374 | self.__update_has_lock(False) | |
375 | self._lock_request_id = None | |
376 | if self._session.is_connected(): | |
377 | msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name]) | |
378 | msg_id = msg.id | |
379 | self._session.send(msg) | |
380 | else: | |
381 | msg_id = None | |
382 | return msg_id | |
383 | ||
384 | def __send_lock_request(self): | |
385 | self._lock_request_id = self.__do_send_lock_request("lock") | |
386 | ||
387 | def __send_unlock_request(self): | |
388 | self.__do_send_lock_request("unlock") | |
389 | ||
390 | def __parse_lock_reply(self, result): | |
391 | self._lock_request_id = None | |
da2d45c6 | 392 | got_lock = isinstance(result, dict) and result.get("locked") is True |
8cdf0349 BP |
393 | self.__update_has_lock(got_lock) |
394 | if not got_lock: | |
395 | self.is_lock_contended = True | |
396 | ||
397 | def __parse_lock_notify(self, params, new_has_lock): | |
398 | if (self.lock_name is not None | |
da2d45c6 | 399 | and isinstance(params, (list, tuple)) |
8cdf0349 BP |
400 | and params |
401 | and params[0] == self.lock_name): | |
e9ad9219 | 402 | self.__update_has_lock(new_has_lock) |
8cdf0349 BP |
403 | if not new_has_lock: |
404 | self.is_lock_contended = True | |
99155935 BP |
405 | |
406 | def __send_monitor_request(self): | |
897c8064 LS |
407 | if self.state == self.IDL_S_INITIAL: |
408 | self.state = self.IDL_S_MONITOR_COND_REQUESTED | |
409 | method = "monitor_cond" | |
410 | else: | |
411 | self.state = self.IDL_S_MONITOR_REQUESTED | |
412 | method = "monitor" | |
413 | ||
99155935 | 414 | monitor_requests = {} |
cb96c1b2 | 415 | for table in six.itervalues(self.tables): |
80c12152 | 416 | columns = [] |
cb96c1b2 | 417 | for column in six.iterkeys(table.columns): |
80c12152 | 418 | if ((table.name not in self.readonly) or |
897c8064 LS |
419 | (table.name in self.readonly) and |
420 | (column not in self.readonly[table.name])): | |
80c12152 SA |
421 | columns.append(column) |
422 | monitor_requests[table.name] = {"columns": columns} | |
0164e367 | 423 | if method == "monitor_cond" and table.condition != [True]: |
897c8064 | 424 | monitor_requests[table.name]["where"] = table.condition |
16ebb90e | 425 | table.cond_change = False |
897c8064 | 426 | |
99155935 | 427 | msg = ovs.jsonrpc.Message.create_request( |
897c8064 | 428 | method, [self._db.name, str(self.uuid), monitor_requests]) |
8cdf0349 BP |
429 | self._monitor_request_id = msg.id |
430 | self._session.send(msg) | |
99155935 | 431 | |
897c8064 | 432 | def __parse_update(self, update, version): |
99155935 | 433 | try: |
897c8064 | 434 | self.__do_parse_update(update, version) |
3ab76c56 | 435 | except error.Error as e: |
3a656eaf EJ |
436 | vlog.err("%s: error parsing update: %s" |
437 | % (self._session.get_name(), e)) | |
99155935 | 438 | |
897c8064 | 439 | def __do_parse_update(self, table_updates, version): |
da2d45c6 | 440 | if not isinstance(table_updates, dict): |
99155935 BP |
441 | raise error.Error("<table-updates> is not an object", |
442 | table_updates) | |
443 | ||
cb96c1b2 | 444 | for table_name, table_update in six.iteritems(table_updates): |
8cdf0349 | 445 | table = self.tables.get(table_name) |
99155935 | 446 | if not table: |
f2d8ad13 BP |
447 | raise error.Error('<table-updates> includes unknown ' |
448 | 'table "%s"' % table_name) | |
99155935 | 449 | |
da2d45c6 | 450 | if not isinstance(table_update, dict): |
f2d8ad13 BP |
451 | raise error.Error('<table-update> for table "%s" is not ' |
452 | 'an object' % table_name, table_update) | |
99155935 | 453 | |
cb96c1b2 | 454 | for uuid_string, row_update in six.iteritems(table_update): |
49c541dc | 455 | if not ovs.ovsuuid.is_valid_string(uuid_string): |
f2d8ad13 BP |
456 | raise error.Error('<table-update> for table "%s" ' |
457 | 'contains bad UUID "%s" as member ' | |
458 | 'name' % (table_name, uuid_string), | |
99155935 | 459 | table_update) |
49c541dc | 460 | uuid = ovs.ovsuuid.from_string(uuid_string) |
99155935 | 461 | |
da2d45c6 | 462 | if not isinstance(row_update, dict): |
f2d8ad13 BP |
463 | raise error.Error('<table-update> for table "%s" ' |
464 | 'contains <row-update> for %s that ' | |
465 | 'is not an object' | |
99155935 BP |
466 | % (table_name, uuid_string)) |
467 | ||
897c8064 LS |
468 | if version == OVSDB_UPDATE2: |
469 | if self.__process_update2(table, uuid, row_update): | |
470 | self.change_seqno += 1 | |
471 | continue | |
472 | ||
af1eba26 | 473 | parser = ovs.db.parser.Parser(row_update, "row-update") |
4c0f6271 BP |
474 | old = parser.get_optional("old", [dict]) |
475 | new = parser.get_optional("new", [dict]) | |
476 | parser.finish() | |
99155935 | 477 | |
99155935 | 478 | if not old and not new: |
f2d8ad13 BP |
479 | raise error.Error('<row-update> missing "old" and ' |
480 | '"new" members', row_update) | |
99155935 | 481 | |
8cdf0349 | 482 | if self.__process_update(table, uuid, old, new): |
99155935 BP |
483 | self.change_seqno += 1 |
484 | ||
897c8064 LS |
485 | def __process_update2(self, table, uuid, row_update): |
486 | row = table.rows.get(uuid) | |
487 | changed = False | |
488 | if "delete" in row_update: | |
489 | if row: | |
490 | del table.rows[uuid] | |
491 | self.notify(ROW_DELETE, row) | |
492 | changed = True | |
493 | else: | |
494 | # XXX rate-limit | |
495 | vlog.warn("cannot delete missing row %s from table" | |
496 | "%s" % (uuid, table.name)) | |
497 | elif "insert" in row_update or "initial" in row_update: | |
498 | if row: | |
499 | vlog.warn("cannot add existing row %s from table" | |
500 | " %s" % (uuid, table.name)) | |
501 | del table.rows[uuid] | |
502 | row = self.__create_row(table, uuid) | |
503 | if "insert" in row_update: | |
504 | row_update = row_update['insert'] | |
505 | else: | |
506 | row_update = row_update['initial'] | |
507 | self.__add_default(table, row_update) | |
508 | if self.__row_update(table, row, row_update): | |
509 | changed = True | |
510 | self.notify(ROW_CREATE, row) | |
511 | elif "modify" in row_update: | |
512 | if not row: | |
513 | raise error.Error('Modify non-existing row') | |
514 | ||
a7261bf7 NS |
515 | old_row_diff_json = self.__apply_diff(table, row, |
516 | row_update['modify']) | |
897c8064 | 517 | self.notify(ROW_UPDATE, row, |
a7261bf7 | 518 | Row.from_json(self, table, uuid, old_row_diff_json)) |
897c8064 LS |
519 | changed = True |
520 | else: | |
521 | raise error.Error('<row-update> unknown operation', | |
522 | row_update) | |
523 | return changed | |
524 | ||
8cdf0349 | 525 | def __process_update(self, table, uuid, old, new): |
99155935 | 526 | """Returns True if a column changed, False otherwise.""" |
8cdf0349 | 527 | row = table.rows.get(uuid) |
7d48f8f8 | 528 | changed = False |
99155935 BP |
529 | if not new: |
530 | # Delete row. | |
531 | if row: | |
8cdf0349 | 532 | del table.rows[uuid] |
7d48f8f8 | 533 | changed = True |
d7d417fc | 534 | self.notify(ROW_DELETE, row) |
99155935 BP |
535 | else: |
536 | # XXX rate-limit | |
3a656eaf EJ |
537 | vlog.warn("cannot delete missing row %s from table %s" |
538 | % (uuid, table.name)) | |
99155935 BP |
539 | elif not old: |
540 | # Insert row. | |
541 | if not row: | |
542 | row = self.__create_row(table, uuid) | |
7d48f8f8 | 543 | changed = True |
99155935 BP |
544 | else: |
545 | # XXX rate-limit | |
3a656eaf EJ |
546 | vlog.warn("cannot add existing row %s to table %s" |
547 | % (uuid, table.name)) | |
8cdf0349 | 548 | if self.__row_update(table, row, new): |
7d48f8f8 | 549 | changed = True |
d7d417fc | 550 | self.notify(ROW_CREATE, row) |
99155935 | 551 | else: |
d7d417fc | 552 | op = ROW_UPDATE |
99155935 BP |
553 | if not row: |
554 | row = self.__create_row(table, uuid) | |
7d48f8f8 | 555 | changed = True |
d7d417fc | 556 | op = ROW_CREATE |
99155935 | 557 | # XXX rate-limit |
3a656eaf EJ |
558 | vlog.warn("cannot modify missing row %s in table %s" |
559 | % (uuid, table.name)) | |
8cdf0349 | 560 | if self.__row_update(table, row, new): |
7d48f8f8 | 561 | changed = True |
d7d417fc | 562 | self.notify(op, row, Row.from_json(self, table, uuid, old)) |
7d48f8f8 | 563 | return changed |
99155935 | 564 | |
897c8064 LS |
565 | def __column_name(self, column): |
566 | if column.type.key.type == ovs.db.types.UuidType: | |
567 | return ovs.ovsuuid.to_json(column.type.key.type.default) | |
568 | else: | |
569 | return column.type.key.type.default | |
570 | ||
571 | def __add_default(self, table, row_update): | |
572 | for column in six.itervalues(table.columns): | |
573 | if column.name not in row_update: | |
574 | if ((table.name not in self.readonly) or | |
575 | (table.name in self.readonly) and | |
576 | (column.name not in self.readonly[table.name])): | |
577 | if column.type.n_min != 0 and not column.type.is_map(): | |
578 | row_update[column.name] = self.__column_name(column) | |
579 | ||
580 | def __apply_diff(self, table, row, row_diff): | |
a7261bf7 NS |
581 | old_row_diff_json = {} |
582 | for column_name, datum_diff_json in six.iteritems(row_diff): | |
897c8064 LS |
583 | column = table.columns.get(column_name) |
584 | if not column: | |
585 | # XXX rate-limit | |
586 | vlog.warn("unknown column %s updating table %s" | |
587 | % (column_name, table.name)) | |
588 | continue | |
589 | ||
590 | try: | |
a59912a0 | 591 | datum_diff = data.Datum.from_json(column.type, datum_diff_json) |
897c8064 LS |
592 | except error.Error as e: |
593 | # XXX rate-limit | |
594 | vlog.warn("error parsing column %s in table %s: %s" | |
595 | % (column_name, table.name, e)) | |
596 | continue | |
597 | ||
a7261bf7 NS |
598 | old_row_diff_json[column_name] = row._data[column_name].to_json() |
599 | datum = row._data[column_name].diff(datum_diff) | |
897c8064 LS |
600 | if datum != row._data[column_name]: |
601 | row._data[column_name] = datum | |
602 | ||
a7261bf7 NS |
603 | return old_row_diff_json |
604 | ||
8cdf0349 | 605 | def __row_update(self, table, row, row_json): |
99155935 | 606 | changed = False |
cb96c1b2 | 607 | for column_name, datum_json in six.iteritems(row_json): |
99155935 BP |
608 | column = table.columns.get(column_name) |
609 | if not column: | |
610 | # XXX rate-limit | |
3a656eaf EJ |
611 | vlog.warn("unknown column %s updating table %s" |
612 | % (column_name, table.name)) | |
99155935 BP |
613 | continue |
614 | ||
615 | try: | |
a59912a0 | 616 | datum = data.Datum.from_json(column.type, datum_json) |
3ab76c56 | 617 | except error.Error as e: |
99155935 | 618 | # XXX rate-limit |
3a656eaf EJ |
619 | vlog.warn("error parsing column %s in table %s: %s" |
620 | % (column_name, table.name, e)) | |
99155935 BP |
621 | continue |
622 | ||
8cdf0349 BP |
623 | if datum != row._data[column_name]: |
624 | row._data[column_name] = datum | |
625 | if column.alert: | |
626 | changed = True | |
99155935 BP |
627 | else: |
628 | # Didn't really change but the OVSDB monitor protocol always | |
629 | # includes every value in a row. | |
630 | pass | |
631 | return changed | |
632 | ||
99155935 | 633 | def __create_row(self, table, uuid): |
8cdf0349 | 634 | data = {} |
cb96c1b2 | 635 | for column in six.itervalues(table.columns): |
8cdf0349 BP |
636 | data[column.name] = ovs.db.data.Datum.default(column.type) |
637 | row = table.rows[uuid] = Row(self, table, uuid, data) | |
99155935 BP |
638 | return row |
639 | ||
8cdf0349 BP |
640 | def __error(self): |
641 | self._session.force_reconnect() | |
642 | ||
643 | def __txn_abort_all(self): | |
644 | while self._outstanding_txns: | |
645 | txn = self._outstanding_txns.popitem()[1] | |
854a94d9 | 646 | txn._status = Transaction.TRY_AGAIN |
8cdf0349 BP |
647 | |
648 | def __txn_process_reply(self, msg): | |
649 | txn = self._outstanding_txns.pop(msg.id, None) | |
650 | if txn: | |
651 | txn._process_reply(msg) | |
beba3d82 | 652 | return True |
8cdf0349 | 653 | |
26bb0f31 | 654 | |
8cdf0349 BP |
655 | def _uuid_to_row(atom, base): |
656 | if base.ref_table: | |
657 | return base.ref_table.rows.get(atom) | |
658 | else: | |
659 | return atom | |
660 | ||
26bb0f31 | 661 | |
8cdf0349 | 662 | def _row_to_uuid(value): |
da2d45c6 | 663 | if isinstance(value, Row): |
8cdf0349 BP |
664 | return value.uuid |
665 | else: | |
666 | return value | |
bf6ec045 | 667 | |
26bb0f31 | 668 | |
fbafc3c2 | 669 | @functools.total_ordering |
bf6ec045 | 670 | class Row(object): |
8cdf0349 BP |
671 | """A row within an IDL. |
672 | ||
673 | The client may access the following attributes directly: | |
674 | ||
675 | - 'uuid': a uuid.UUID object whose value is the row's database UUID. | |
676 | ||
677 | - An attribute for each column in the Row's table, named for the column, | |
678 | whose values are as returned by Datum.to_python() for the column's type. | |
679 | ||
680 | If some error occurs (e.g. the database server's idea of the column is | |
681 | different from the IDL's idea), then the attribute values is the | |
682 | "default" value return by Datum.default() for the column's type. (It is | |
683 | important to know this because the default value may violate constraints | |
684 | for the column's type, e.g. the default integer value is 0 even if column | |
685 | contraints require the column's value to be positive.) | |
686 | ||
687 | When a transaction is active, column attributes may also be assigned new | |
688 | values. Committing the transaction will then cause the new value to be | |
689 | stored into the database. | |
690 | ||
691 | *NOTE*: In the current implementation, the value of a column is a *copy* | |
692 | of the value in the database. This means that modifying its value | |
693 | directly will have no useful effect. For example, the following: | |
694 | row.mycolumn["a"] = "b" # don't do this | |
695 | will not change anything in the database, even after commit. To modify | |
696 | the column, instead assign the modified column value back to the column: | |
697 | d = row.mycolumn | |
698 | d["a"] = "b" | |
699 | row.mycolumn = d | |
700 | """ | |
701 | def __init__(self, idl, table, uuid, data): | |
702 | # All of the explicit references to self.__dict__ below are required | |
703 | # to set real attributes with invoking self.__getattr__(). | |
704 | self.__dict__["uuid"] = uuid | |
705 | ||
706 | self.__dict__["_idl"] = idl | |
707 | self.__dict__["_table"] = table | |
708 | ||
709 | # _data is the committed data. It takes the following values: | |
710 | # | |
711 | # - A dictionary that maps every column name to a Datum, if the row | |
712 | # exists in the committed form of the database. | |
713 | # | |
714 | # - None, if this row is newly inserted within the active transaction | |
715 | # and thus has no committed form. | |
716 | self.__dict__["_data"] = data | |
717 | ||
718 | # _changes describes changes to this row within the active transaction. | |
719 | # It takes the following values: | |
720 | # | |
721 | # - {}, the empty dictionary, if no transaction is active or if the | |
722 | # row has yet not been changed within this transaction. | |
723 | # | |
724 | # - A dictionary that maps a column name to its new Datum, if an | |
725 | # active transaction changes those columns' values. | |
726 | # | |
727 | # - A dictionary that maps every column name to a Datum, if the row | |
728 | # is newly inserted within the active transaction. | |
729 | # | |
730 | # - None, if this transaction deletes this row. | |
731 | self.__dict__["_changes"] = {} | |
732 | ||
a59912a0 RM |
733 | # _mutations describes changes to this row to be handled via a |
734 | # mutate operation on the wire. It takes the following values: | |
735 | # | |
736 | # - {}, the empty dictionary, if no transaction is active or if the | |
737 | # row has yet not been mutated within this transaction. | |
738 | # | |
739 | # - A dictionary that contains two keys: | |
740 | # | |
741 | # - "_inserts" contains a dictionary that maps column names to | |
742 | # new keys/key-value pairs that should be inserted into the | |
743 | # column | |
744 | # - "_removes" contains a dictionary that maps column names to | |
745 | # the keys/key-value pairs that should be removed from the | |
746 | # column | |
747 | # | |
748 | # - None, if this transaction deletes this row. | |
749 | self.__dict__["_mutations"] = {} | |
750 | ||
8cdf0349 BP |
751 | # A dictionary whose keys are the names of columns that must be |
752 | # verified as prerequisites when the transaction commits. The values | |
753 | # in the dictionary are all None. | |
754 | self.__dict__["_prereqs"] = {} | |
755 | ||
fbafc3c2 RB |
756 | def __lt__(self, other): |
757 | if not isinstance(other, Row): | |
758 | return NotImplemented | |
759 | return bool(self.__dict__['uuid'] < other.__dict__['uuid']) | |
760 | ||
761 | def __eq__(self, other): | |
762 | if not isinstance(other, Row): | |
763 | return NotImplemented | |
764 | return bool(self.__dict__['uuid'] == other.__dict__['uuid']) | |
765 | ||
766 | def __hash__(self): | |
767 | return int(self.__dict__['uuid']) | |
768 | ||
8cdf0349 BP |
769 | def __getattr__(self, column_name): |
770 | assert self._changes is not None | |
a59912a0 | 771 | assert self._mutations is not None |
8cdf0349 | 772 | |
2d54d801 | 773 | column = self._table.columns[column_name] |
8cdf0349 | 774 | datum = self._changes.get(column_name) |
a59912a0 RM |
775 | inserts = None |
776 | if '_inserts' in self._mutations.keys(): | |
777 | inserts = self._mutations['_inserts'].get(column_name) | |
778 | removes = None | |
779 | if '_removes' in self._mutations.keys(): | |
780 | removes = self._mutations['_removes'].get(column_name) | |
8cdf0349 | 781 | if datum is None: |
3b4c362f | 782 | if self._data is None: |
a59912a0 RM |
783 | if inserts is None: |
784 | raise AttributeError("%s instance has no attribute '%s'" % | |
785 | (self.__class__.__name__, | |
786 | column_name)) | |
787 | else: | |
2d54d801 AB |
788 | datum = data.Datum.from_python(column.type, |
789 | inserts, | |
790 | _row_to_uuid) | |
791 | elif column_name in self._data: | |
80c12152 | 792 | datum = self._data[column_name] |
2d54d801 AB |
793 | if column.type.is_set(): |
794 | dlist = datum.as_list() | |
a59912a0 | 795 | if inserts is not None: |
2d54d801 | 796 | dlist.extend(list(inserts)) |
a59912a0 | 797 | if removes is not None: |
2d54d801 AB |
798 | removes_datum = data.Datum.from_python(column.type, |
799 | removes, | |
800 | _row_to_uuid) | |
801 | removes_list = removes_datum.as_list() | |
802 | dlist = [x for x in dlist if x not in removes_list] | |
803 | datum = data.Datum.from_python(column.type, dlist, | |
804 | _row_to_uuid) | |
805 | elif column.type.is_map(): | |
806 | dmap = datum.to_python(_uuid_to_row) | |
a59912a0 | 807 | if inserts is not None: |
2d54d801 | 808 | dmap.update(inserts) |
a59912a0 | 809 | if removes is not None: |
2d54d801 AB |
810 | for key in removes: |
811 | if key not in (inserts or {}): | |
812 | del dmap[key] | |
813 | datum = data.Datum.from_python(column.type, dmap, | |
814 | _row_to_uuid) | |
80c12152 | 815 | else: |
a59912a0 RM |
816 | if inserts is None: |
817 | raise AttributeError("%s instance has no attribute '%s'" % | |
818 | (self.__class__.__name__, | |
819 | column_name)) | |
820 | else: | |
821 | datum = inserts | |
8cdf0349 BP |
822 | |
823 | return datum.to_python(_uuid_to_row) | |
824 | ||
825 | def __setattr__(self, column_name, value): | |
826 | assert self._changes is not None | |
827 | assert self._idl.txn | |
828 | ||
80c12152 | 829 | if ((self._table.name in self._idl.readonly) and |
897c8064 | 830 | (column_name in self._idl.readonly[self._table.name])): |
37520ab3 RB |
831 | vlog.warn("attempting to write to readonly column %s" |
832 | % column_name) | |
80c12152 SA |
833 | return |
834 | ||
8cdf0349 BP |
835 | column = self._table.columns[column_name] |
836 | try: | |
a59912a0 | 837 | datum = data.Datum.from_python(column.type, value, _row_to_uuid) |
3ab76c56 | 838 | except error.Error as e: |
8cdf0349 | 839 | # XXX rate-limit |
3a656eaf EJ |
840 | vlog.err("attempting to write bad value to column %s (%s)" |
841 | % (column_name, e)) | |
8cdf0349 BP |
842 | return |
843 | self._idl.txn._write(self, column, datum) | |
844 | ||
a59912a0 RM |
845 | def addvalue(self, column_name, key): |
846 | self._idl.txn._txn_rows[self.uuid] = self | |
330b9c9c AB |
847 | column = self._table.columns[column_name] |
848 | try: | |
849 | data.Datum.from_python(column.type, key, _row_to_uuid) | |
850 | except error.Error as e: | |
851 | # XXX rate-limit | |
852 | vlog.err("attempting to write bad value to column %s (%s)" | |
853 | % (column_name, e)) | |
854 | return | |
855 | inserts = self._mutations.setdefault('_inserts', {}) | |
856 | column_value = inserts.setdefault(column_name, set()) | |
857 | column_value.add(key) | |
a59912a0 RM |
858 | |
859 | def delvalue(self, column_name, key): | |
860 | self._idl.txn._txn_rows[self.uuid] = self | |
330b9c9c AB |
861 | column = self._table.columns[column_name] |
862 | try: | |
863 | data.Datum.from_python(column.type, key, _row_to_uuid) | |
864 | except error.Error as e: | |
865 | # XXX rate-limit | |
866 | vlog.err("attempting to delete bad value from column %s (%s)" | |
867 | % (column_name, e)) | |
868 | return | |
869 | removes = self._mutations.setdefault('_removes', {}) | |
870 | column_value = removes.setdefault(column_name, set()) | |
871 | column_value.add(key) | |
a59912a0 RM |
872 | |
873 | def setkey(self, column_name, key, value): | |
874 | self._idl.txn._txn_rows[self.uuid] = self | |
875 | column = self._table.columns[column_name] | |
876 | try: | |
330b9c9c | 877 | data.Datum.from_python(column.type, {key: value}, _row_to_uuid) |
a59912a0 RM |
878 | except error.Error as e: |
879 | # XXX rate-limit | |
880 | vlog.err("attempting to write bad value to column %s (%s)" | |
881 | % (column_name, e)) | |
882 | return | |
2d54d801 | 883 | if self._data and column_name in self._data: |
330b9c9c AB |
884 | # Remove existing key/value before updating. |
885 | removes = self._mutations.setdefault('_removes', {}) | |
886 | column_value = removes.setdefault(column_name, set()) | |
887 | column_value.add(key) | |
888 | inserts = self._mutations.setdefault('_inserts', {}) | |
889 | column_value = inserts.setdefault(column_name, {}) | |
890 | column_value[key] = value | |
891 | ||
892 | def delkey(self, column_name, key, value=None): | |
a59912a0 | 893 | self._idl.txn._txn_rows[self.uuid] = self |
330b9c9c AB |
894 | if value: |
895 | try: | |
896 | old_value = data.Datum.to_python(self._data[column_name], | |
897 | _uuid_to_row) | |
898 | except error.Error: | |
899 | return | |
900 | if key not in old_value: | |
901 | return | |
902 | if old_value[key] != value: | |
903 | return | |
904 | removes = self._mutations.setdefault('_removes', {}) | |
905 | column_value = removes.setdefault(column_name, set()) | |
906 | column_value.add(key) | |
a59912a0 RM |
907 | return |
908 | ||
d7d417fc TW |
909 | @classmethod |
910 | def from_json(cls, idl, table, uuid, row_json): | |
911 | data = {} | |
cb96c1b2 | 912 | for column_name, datum_json in six.iteritems(row_json): |
d7d417fc TW |
913 | column = table.columns.get(column_name) |
914 | if not column: | |
915 | # XXX rate-limit | |
916 | vlog.warn("unknown column %s in table %s" | |
917 | % (column_name, table.name)) | |
918 | continue | |
919 | try: | |
920 | datum = ovs.db.data.Datum.from_json(column.type, datum_json) | |
3ab76c56 | 921 | except error.Error as e: |
d7d417fc TW |
922 | # XXX rate-limit |
923 | vlog.warn("error parsing column %s in table %s: %s" | |
924 | % (column_name, table.name, e)) | |
925 | continue | |
926 | data[column_name] = datum | |
927 | return cls(idl, table, uuid, data) | |
928 | ||
8cdf0349 BP |
929 | def verify(self, column_name): |
930 | """Causes the original contents of column 'column_name' in this row to | |
931 | be verified as a prerequisite to completing the transaction. That is, | |
932 | if 'column_name' changed in this row (or if this row was deleted) | |
933 | between the time that the IDL originally read its contents and the time | |
934 | that the transaction commits, then the transaction aborts and | |
854a94d9 | 935 | Transaction.commit() returns Transaction.TRY_AGAIN. |
8cdf0349 BP |
936 | |
937 | The intention is that, to ensure that no transaction commits based on | |
938 | dirty reads, an application should call Row.verify() on each data item | |
939 | read as part of a read-modify-write operation. | |
940 | ||
941 | In some cases Row.verify() reduces to a no-op, because the current | |
942 | value of the column is already known: | |
943 | ||
944 | - If this row is a row created by the current transaction (returned | |
945 | by Transaction.insert()). | |
946 | ||
947 | - If the column has already been modified within the current | |
948 | transaction. | |
949 | ||
950 | Because of the latter property, always call Row.verify() *before* | |
951 | modifying the column, for a given read-modify-write. | |
952 | ||
953 | A transaction must be in progress.""" | |
954 | assert self._idl.txn | |
955 | assert self._changes is not None | |
956 | if not self._data or column_name in self._changes: | |
957 | return | |
958 | ||
959 | self._prereqs[column_name] = None | |
960 | ||
961 | def delete(self): | |
962 | """Deletes this row from its table. | |
963 | ||
964 | A transaction must be in progress.""" | |
965 | assert self._idl.txn | |
966 | assert self._changes is not None | |
967 | if self._data is None: | |
968 | del self._idl.txn._txn_rows[self.uuid] | |
8d3efc1c BP |
969 | else: |
970 | self._idl.txn._txn_rows[self.uuid] = self | |
8cdf0349 BP |
971 | self.__dict__["_changes"] = None |
972 | del self._table.rows[self.uuid] | |
973 | ||
80c12152 SA |
974 | def fetch(self, column_name): |
975 | self._idl.txn._fetch(self, column_name) | |
976 | ||
94fbe1aa BP |
977 | def increment(self, column_name): |
978 | """Causes the transaction, when committed, to increment the value of | |
979 | 'column_name' within this row by 1. 'column_name' must have an integer | |
980 | type. After the transaction commits successfully, the client may | |
981 | retrieve the final (incremented) value of 'column_name' with | |
982 | Transaction.get_increment_new_value(). | |
983 | ||
984 | The client could accomplish something similar by reading and writing | |
985 | and verify()ing columns. However, increment() will never (by itself) | |
986 | cause a transaction to fail because of a verify error. | |
987 | ||
988 | The intended use is for incrementing the "next_cfg" column in | |
989 | the Open_vSwitch table.""" | |
990 | self._idl.txn._increment(self, column_name) | |
991 | ||
26bb0f31 | 992 | |
8cdf0349 BP |
993 | def _uuid_name_from_uuid(uuid): |
994 | return "row%s" % str(uuid).replace("-", "_") | |
995 | ||
26bb0f31 | 996 | |
8cdf0349 BP |
997 | def _where_uuid_equals(uuid): |
998 | return [["_uuid", "==", ["uuid", str(uuid)]]] | |
999 | ||
26bb0f31 | 1000 | |
8cdf0349 BP |
1001 | class _InsertedRow(object): |
1002 | def __init__(self, op_index): | |
1003 | self.op_index = op_index | |
1004 | self.real = None | |
1005 | ||
26bb0f31 | 1006 | |
8cdf0349 | 1007 | class Transaction(object): |
2f926787 BP |
1008 | """A transaction may modify the contents of a database by modifying the |
1009 | values of columns, deleting rows, inserting rows, or adding checks that | |
1010 | columns in the database have not changed ("verify" operations), through | |
1011 | Row methods. | |
1012 | ||
1013 | Reading and writing columns and inserting and deleting rows are all | |
1014 | straightforward. The reasons to verify columns are less obvious. | |
1015 | Verification is the key to maintaining transactional integrity. Because | |
1016 | OVSDB handles multiple clients, it can happen that between the time that | |
1017 | OVSDB client A reads a column and writes a new value, OVSDB client B has | |
1018 | written that column. Client A's write should not ordinarily overwrite | |
1019 | client B's, especially if the column in question is a "map" column that | |
1020 | contains several more or less independent data items. If client A adds a | |
1021 | "verify" operation before it writes the column, then the transaction fails | |
1022 | in case client B modifies it first. Client A will then see the new value | |
1023 | of the column and compose a new transaction based on the new contents | |
1024 | written by client B. | |
1025 | ||
1026 | When a transaction is complete, which must be before the next call to | |
1027 | Idl.run(), call Transaction.commit() or Transaction.abort(). | |
1028 | ||
1029 | The life-cycle of a transaction looks like this: | |
1030 | ||
1031 | 1. Create the transaction and record the initial sequence number: | |
1032 | ||
1033 | seqno = idl.change_seqno(idl) | |
1034 | txn = Transaction(idl) | |
1035 | ||
1036 | 2. Modify the database with Row and Transaction methods. | |
1037 | ||
1038 | 3. Commit the transaction by calling Transaction.commit(). The first call | |
1039 | to this function probably returns Transaction.INCOMPLETE. The client | |
1040 | must keep calling again along as this remains true, calling Idl.run() in | |
1041 | between to let the IDL do protocol processing. (If the client doesn't | |
1042 | have anything else to do in the meantime, it can use | |
1043 | Transaction.commit_block() to avoid having to loop itself.) | |
1044 | ||
1045 | 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno | |
1046 | to change from the saved 'seqno' (it's possible that it's already | |
1047 | changed, in which case the client should not wait at all), then start | |
1048 | over from step 1. Only a call to Idl.run() will change the return value | |
1049 | of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)""" | |
1050 | ||
8cdf0349 | 1051 | # Status values that Transaction.commit() can return. |
eda26d40 RB |
1052 | |
1053 | # Not yet committed or aborted. | |
1054 | UNCOMMITTED = "uncommitted" | |
1055 | # Transaction didn't include any changes. | |
1056 | UNCHANGED = "unchanged" | |
1057 | # Commit in progress, please wait. | |
1058 | INCOMPLETE = "incomplete" | |
1059 | # ovsdb_idl_txn_abort() called. | |
1060 | ABORTED = "aborted" | |
1061 | # Commit successful. | |
1062 | SUCCESS = "success" | |
1063 | # Commit failed because a "verify" operation | |
1064 | # reported an inconsistency, due to a network | |
1065 | # problem, or other transient failure. Wait | |
1066 | # for a change, then try again. | |
1067 | TRY_AGAIN = "try again" | |
1068 | # Server hasn't given us the lock yet. | |
1069 | NOT_LOCKED = "not locked" | |
1070 | # Commit failed due to a hard error. | |
1071 | ERROR = "error" | |
8cdf0349 BP |
1072 | |
1073 | @staticmethod | |
1074 | def status_to_string(status): | |
1075 | """Converts one of the status values that Transaction.commit() can | |
1076 | return into a human-readable string. | |
1077 | ||
1078 | (The status values are in fact such strings already, so | |
1079 | there's nothing to do.)""" | |
1080 | return status | |
1081 | ||
1082 | def __init__(self, idl): | |
1083 | """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl). | |
1084 | A given Idl may only have a single active transaction at a time. | |
1085 | ||
1086 | A Transaction may modify the contents of a database by assigning new | |
1087 | values to columns (attributes of Row), deleting rows (with | |
1088 | Row.delete()), or inserting rows (with Transaction.insert()). It may | |
1089 | also check that columns in the database have not changed with | |
1090 | Row.verify(). | |
1091 | ||
1092 | When a transaction is complete (which must be before the next call to | |
1093 | Idl.run()), call Transaction.commit() or Transaction.abort().""" | |
1094 | assert idl.txn is None | |
1095 | ||
1096 | idl.txn = self | |
1097 | self._request_id = None | |
1098 | self.idl = idl | |
1099 | self.dry_run = False | |
1100 | self._txn_rows = {} | |
1101 | self._status = Transaction.UNCOMMITTED | |
1102 | self._error = None | |
1103 | self._comments = [] | |
1104 | ||
94fbe1aa | 1105 | self._inc_row = None |
8cdf0349 | 1106 | self._inc_column = None |
8cdf0349 | 1107 | |
80c12152 SA |
1108 | self._fetch_requests = [] |
1109 | ||
26bb0f31 | 1110 | self._inserted_rows = {} # Map from UUID to _InsertedRow |
8cdf0349 BP |
1111 | |
1112 | def add_comment(self, comment): | |
80c12152 | 1113 | """Appends 'comment' to the comments that will be passed to the OVSDB |
8cdf0349 BP |
1114 | server when this transaction is committed. (The comment will be |
1115 | committed to the OVSDB log, which "ovsdb-tool show-log" can print in a | |
1116 | relatively human-readable form.)""" | |
1117 | self._comments.append(comment) | |
1118 | ||
8cdf0349 | 1119 | def wait(self, poller): |
2f926787 BP |
1120 | """Causes poll_block() to wake up if this transaction has completed |
1121 | committing.""" | |
8cdf0349 BP |
1122 | if self._status not in (Transaction.UNCOMMITTED, |
1123 | Transaction.INCOMPLETE): | |
1124 | poller.immediate_wake() | |
1125 | ||
1126 | def _substitute_uuids(self, json): | |
da2d45c6 | 1127 | if isinstance(json, (list, tuple)): |
8cdf0349 | 1128 | if (len(json) == 2 |
897c8064 LS |
1129 | and json[0] == 'uuid' |
1130 | and ovs.ovsuuid.is_valid_string(json[1])): | |
8cdf0349 BP |
1131 | uuid = ovs.ovsuuid.from_string(json[1]) |
1132 | row = self._txn_rows.get(uuid, None) | |
1133 | if row and row._data is None: | |
1134 | return ["named-uuid", _uuid_name_from_uuid(uuid)] | |
225b582a IY |
1135 | else: |
1136 | return [self._substitute_uuids(elem) for elem in json] | |
8cdf0349 BP |
1137 | return json |
1138 | ||
1139 | def __disassemble(self): | |
1140 | self.idl.txn = None | |
1141 | ||
cb96c1b2 | 1142 | for row in six.itervalues(self._txn_rows): |
8cdf0349 BP |
1143 | if row._changes is None: |
1144 | row._table.rows[row.uuid] = row | |
1145 | elif row._data is None: | |
1146 | del row._table.rows[row.uuid] | |
1147 | row.__dict__["_changes"] = {} | |
a59912a0 | 1148 | row.__dict__["_mutations"] = {} |
8cdf0349 BP |
1149 | row.__dict__["_prereqs"] = {} |
1150 | self._txn_rows = {} | |
1151 | ||
1152 | def commit(self): | |
2f926787 BP |
1153 | """Attempts to commit 'txn'. Returns the status of the commit |
1154 | operation, one of the following constants: | |
1155 | ||
1156 | Transaction.INCOMPLETE: | |
1157 | ||
1158 | The transaction is in progress, but not yet complete. The caller | |
1159 | should call again later, after calling Idl.run() to let the | |
1160 | IDL do OVSDB protocol processing. | |
1161 | ||
1162 | Transaction.UNCHANGED: | |
1163 | ||
1164 | The transaction is complete. (It didn't actually change the | |
1165 | database, so the IDL didn't send any request to the database | |
1166 | server.) | |
1167 | ||
1168 | Transaction.ABORTED: | |
1169 | ||
1170 | The caller previously called Transaction.abort(). | |
1171 | ||
1172 | Transaction.SUCCESS: | |
1173 | ||
1174 | The transaction was successful. The update made by the | |
1175 | transaction (and possibly other changes made by other database | |
1176 | clients) should already be visible in the IDL. | |
1177 | ||
1178 | Transaction.TRY_AGAIN: | |
1179 | ||
1180 | The transaction failed for some transient reason, e.g. because a | |
1181 | "verify" operation reported an inconsistency or due to a network | |
1182 | problem. The caller should wait for a change to the database, | |
1183 | then compose a new transaction, and commit the new transaction. | |
1184 | ||
1185 | Use Idl.change_seqno to wait for a change in the database. It is | |
1186 | important to use its value *before* the initial call to | |
1187 | Transaction.commit() as the baseline for this purpose, because | |
1188 | the change that one should wait for can happen after the initial | |
1189 | call but before the call that returns Transaction.TRY_AGAIN, and | |
1190 | using some other baseline value in that situation could cause an | |
1191 | indefinite wait if the database rarely changes. | |
1192 | ||
1193 | Transaction.NOT_LOCKED: | |
1194 | ||
1195 | The transaction failed because the IDL has been configured to | |
1196 | require a database lock (with Idl.set_lock()) but didn't | |
1197 | get it yet or has already lost it. | |
8cdf0349 BP |
1198 | |
1199 | Committing a transaction rolls back all of the changes that it made to | |
2f926787 | 1200 | the IDL's copy of the database. If the transaction commits |
8cdf0349 | 1201 | successfully, then the database server will send an update and, thus, |
2f926787 | 1202 | the IDL will be updated with the committed changes.""" |
8cdf0349 BP |
1203 | # The status can only change if we're the active transaction. |
1204 | # (Otherwise, our status will change only in Idl.run().) | |
1205 | if self != self.idl.txn: | |
1206 | return self._status | |
1207 | ||
1208 | # If we need a lock but don't have it, give up quickly. | |
9614403d | 1209 | if self.idl.lock_name and not self.idl.has_lock: |
8cdf0349 BP |
1210 | self._status = Transaction.NOT_LOCKED |
1211 | self.__disassemble() | |
1212 | return self._status | |
1213 | ||
1214 | operations = [self.idl._db.name] | |
1215 | ||
1216 | # Assert that we have the required lock (avoiding a race). | |
1217 | if self.idl.lock_name: | |
1218 | operations.append({"op": "assert", | |
1219 | "lock": self.idl.lock_name}) | |
1220 | ||
1221 | # Add prerequisites and declarations of new rows. | |
cb96c1b2 | 1222 | for row in six.itervalues(self._txn_rows): |
8cdf0349 BP |
1223 | if row._prereqs: |
1224 | rows = {} | |
1225 | columns = [] | |
1226 | for column_name in row._prereqs: | |
1227 | columns.append(column_name) | |
1228 | rows[column_name] = row._data[column_name].to_json() | |
1229 | operations.append({"op": "wait", | |
1230 | "table": row._table.name, | |
1231 | "timeout": 0, | |
1232 | "where": _where_uuid_equals(row.uuid), | |
1233 | "until": "==", | |
1234 | "columns": columns, | |
1235 | "rows": [rows]}) | |
1236 | ||
1237 | # Add updates. | |
1238 | any_updates = False | |
cb96c1b2 | 1239 | for row in six.itervalues(self._txn_rows): |
8cdf0349 BP |
1240 | if row._changes is None: |
1241 | if row._table.is_root: | |
1242 | operations.append({"op": "delete", | |
1243 | "table": row._table.name, | |
1244 | "where": _where_uuid_equals(row.uuid)}) | |
1245 | any_updates = True | |
1246 | else: | |
1247 | # Let ovsdb-server decide whether to really delete it. | |
1248 | pass | |
1249 | elif row._changes: | |
1250 | op = {"table": row._table.name} | |
1251 | if row._data is None: | |
1252 | op["op"] = "insert" | |
1253 | op["uuid-name"] = _uuid_name_from_uuid(row.uuid) | |
1254 | any_updates = True | |
1255 | ||
1256 | op_index = len(operations) - 1 | |
1257 | self._inserted_rows[row.uuid] = _InsertedRow(op_index) | |
1258 | else: | |
1259 | op["op"] = "update" | |
1260 | op["where"] = _where_uuid_equals(row.uuid) | |
1261 | ||
1262 | row_json = {} | |
1263 | op["row"] = row_json | |
1264 | ||
cb96c1b2 | 1265 | for column_name, datum in six.iteritems(row._changes): |
8cdf0349 | 1266 | if row._data is not None or not datum.is_default(): |
26bb0f31 | 1267 | row_json[column_name] = ( |
897c8064 | 1268 | self._substitute_uuids(datum.to_json())) |
8cdf0349 BP |
1269 | |
1270 | # If anything really changed, consider it an update. | |
1271 | # We can't suppress not-really-changed values earlier | |
1272 | # or transactions would become nonatomic (see the big | |
1273 | # comment inside Transaction._write()). | |
1274 | if (not any_updates and row._data is not None and | |
897c8064 | 1275 | row._data[column_name] != datum): |
8cdf0349 BP |
1276 | any_updates = True |
1277 | ||
1278 | if row._data is None or row_json: | |
1279 | operations.append(op) | |
a59912a0 RM |
1280 | if row._mutations: |
1281 | addop = False | |
1282 | op = {"table": row._table.name} | |
1283 | op["op"] = "mutate" | |
b3220c67 AB |
1284 | if row._data is None: |
1285 | # New row | |
1286 | op["where"] = self._substitute_uuids( | |
1287 | _where_uuid_equals(row.uuid)) | |
1288 | else: | |
1289 | # Existing row | |
1290 | op["where"] = _where_uuid_equals(row.uuid) | |
a59912a0 RM |
1291 | op["mutations"] = [] |
1292 | if '_removes' in row._mutations.keys(): | |
1293 | for col, dat in six.iteritems(row._mutations['_removes']): | |
1294 | column = row._table.columns[col] | |
1295 | if column.type.is_map(): | |
1296 | opdat = ["set"] | |
330b9c9c | 1297 | opdat.append(list(dat)) |
a59912a0 RM |
1298 | else: |
1299 | opdat = ["set"] | |
1300 | inner_opdat = [] | |
1301 | for ele in dat: | |
1302 | try: | |
1303 | datum = data.Datum.from_python(column.type, | |
1304 | ele, _row_to_uuid) | |
1305 | except error.Error: | |
1306 | return | |
1307 | inner_opdat.append( | |
1308 | self._substitute_uuids(datum.to_json())) | |
1309 | opdat.append(inner_opdat) | |
1310 | mutation = [col, "delete", opdat] | |
1311 | op["mutations"].append(mutation) | |
1312 | addop = True | |
1313 | if '_inserts' in row._mutations.keys(): | |
330b9c9c | 1314 | for col, val in six.iteritems(row._mutations['_inserts']): |
a59912a0 RM |
1315 | column = row._table.columns[col] |
1316 | if column.type.is_map(): | |
1317 | opdat = ["map"] | |
330b9c9c AB |
1318 | datum = data.Datum.from_python(column.type, val, |
1319 | _row_to_uuid) | |
1320 | opdat.append(datum.as_list()) | |
a59912a0 RM |
1321 | else: |
1322 | opdat = ["set"] | |
1323 | inner_opdat = [] | |
330b9c9c | 1324 | for ele in val: |
a59912a0 RM |
1325 | try: |
1326 | datum = data.Datum.from_python(column.type, | |
1327 | ele, _row_to_uuid) | |
1328 | except error.Error: | |
1329 | return | |
1330 | inner_opdat.append( | |
1331 | self._substitute_uuids(datum.to_json())) | |
1332 | opdat.append(inner_opdat) | |
1333 | mutation = [col, "insert", opdat] | |
1334 | op["mutations"].append(mutation) | |
1335 | addop = True | |
1336 | if addop: | |
1337 | operations.append(op) | |
1338 | any_updates = True | |
8cdf0349 | 1339 | |
80c12152 SA |
1340 | if self._fetch_requests: |
1341 | for fetch in self._fetch_requests: | |
1342 | fetch["index"] = len(operations) - 1 | |
1343 | operations.append({"op": "select", | |
1344 | "table": fetch["row"]._table.name, | |
1345 | "where": self._substitute_uuids( | |
1346 | _where_uuid_equals(fetch["row"].uuid)), | |
1347 | "columns": [fetch["column_name"]]}) | |
1348 | any_updates = True | |
1349 | ||
8cdf0349 | 1350 | # Add increment. |
94fbe1aa | 1351 | if self._inc_row and any_updates: |
8cdf0349 BP |
1352 | self._inc_index = len(operations) - 1 |
1353 | ||
1354 | operations.append({"op": "mutate", | |
94fbe1aa | 1355 | "table": self._inc_row._table.name, |
26bb0f31 | 1356 | "where": self._substitute_uuids( |
94fbe1aa | 1357 | _where_uuid_equals(self._inc_row.uuid)), |
8cdf0349 BP |
1358 | "mutations": [[self._inc_column, "+=", 1]]}) |
1359 | operations.append({"op": "select", | |
94fbe1aa | 1360 | "table": self._inc_row._table.name, |
26bb0f31 | 1361 | "where": self._substitute_uuids( |
94fbe1aa | 1362 | _where_uuid_equals(self._inc_row.uuid)), |
8cdf0349 BP |
1363 | "columns": [self._inc_column]}) |
1364 | ||
1365 | # Add comment. | |
1366 | if self._comments: | |
1367 | operations.append({"op": "comment", | |
1368 | "comment": "\n".join(self._comments)}) | |
1369 | ||
1370 | # Dry run? | |
1371 | if self.dry_run: | |
1372 | operations.append({"op": "abort"}) | |
1373 | ||
1374 | if not any_updates: | |
1375 | self._status = Transaction.UNCHANGED | |
1376 | else: | |
1377 | msg = ovs.jsonrpc.Message.create_request("transact", operations) | |
1378 | self._request_id = msg.id | |
1379 | if not self.idl._session.send(msg): | |
1380 | self.idl._outstanding_txns[self._request_id] = self | |
1381 | self._status = Transaction.INCOMPLETE | |
1382 | else: | |
854a94d9 | 1383 | self._status = Transaction.TRY_AGAIN |
8cdf0349 BP |
1384 | |
1385 | self.__disassemble() | |
1386 | return self._status | |
1387 | ||
1388 | def commit_block(self): | |
2f926787 BP |
1389 | """Attempts to commit this transaction, blocking until the commit |
1390 | either succeeds or fails. Returns the final commit status, which may | |
1391 | be any Transaction.* value other than Transaction.INCOMPLETE. | |
1392 | ||
1393 | This function calls Idl.run() on this transaction'ss IDL, so it may | |
1394 | cause Idl.change_seqno to change.""" | |
8cdf0349 BP |
1395 | while True: |
1396 | status = self.commit() | |
1397 | if status != Transaction.INCOMPLETE: | |
1398 | return status | |
1399 | ||
1400 | self.idl.run() | |
1401 | ||
1402 | poller = ovs.poller.Poller() | |
1403 | self.idl.wait(poller) | |
1404 | self.wait(poller) | |
1405 | poller.block() | |
1406 | ||
1407 | def get_increment_new_value(self): | |
2f926787 BP |
1408 | """Returns the final (incremented) value of the column in this |
1409 | transaction that was set to be incremented by Row.increment. This | |
1410 | transaction must have committed successfully.""" | |
8cdf0349 BP |
1411 | assert self._status == Transaction.SUCCESS |
1412 | return self._inc_new_value | |
1413 | ||
1414 | def abort(self): | |
1415 | """Aborts this transaction. If Transaction.commit() has already been | |
1416 | called then the transaction might get committed anyhow.""" | |
1417 | self.__disassemble() | |
1418 | if self._status in (Transaction.UNCOMMITTED, | |
1419 | Transaction.INCOMPLETE): | |
1420 | self._status = Transaction.ABORTED | |
1421 | ||
1422 | def get_error(self): | |
1423 | """Returns a string representing this transaction's current status, | |
1424 | suitable for use in log messages.""" | |
1425 | if self._status != Transaction.ERROR: | |
1426 | return Transaction.status_to_string(self._status) | |
1427 | elif self._error: | |
1428 | return self._error | |
1429 | else: | |
1430 | return "no error details available" | |
1431 | ||
1432 | def __set_error_json(self, json): | |
1433 | if self._error is None: | |
1434 | self._error = ovs.json.to_string(json) | |
1435 | ||
1436 | def get_insert_uuid(self, uuid): | |
1437 | """Finds and returns the permanent UUID that the database assigned to a | |
1438 | newly inserted row, given the UUID that Transaction.insert() assigned | |
1439 | locally to that row. | |
1440 | ||
1441 | Returns None if 'uuid' is not a UUID assigned by Transaction.insert() | |
1442 | or if it was assigned by that function and then deleted by Row.delete() | |
1443 | within the same transaction. (Rows that are inserted and then deleted | |
1444 | within a single transaction are never sent to the database server, so | |
1445 | it never assigns them a permanent UUID.) | |
1446 | ||
1447 | This transaction must have completed successfully.""" | |
1448 | assert self._status in (Transaction.SUCCESS, | |
1449 | Transaction.UNCHANGED) | |
1450 | inserted_row = self._inserted_rows.get(uuid) | |
1451 | if inserted_row: | |
1452 | return inserted_row.real | |
1453 | return None | |
1454 | ||
94fbe1aa BP |
1455 | def _increment(self, row, column): |
1456 | assert not self._inc_row | |
1457 | self._inc_row = row | |
1458 | self._inc_column = column | |
1459 | ||
80c12152 | 1460 | def _fetch(self, row, column_name): |
a0631d92 | 1461 | self._fetch_requests.append({"row": row, "column_name": column_name}) |
80c12152 | 1462 | |
8cdf0349 BP |
1463 | def _write(self, row, column, datum): |
1464 | assert row._changes is not None | |
a59912a0 | 1465 | assert row._mutations is not None |
8cdf0349 BP |
1466 | |
1467 | txn = row._idl.txn | |
1468 | ||
1469 | # If this is a write-only column and the datum being written is the | |
1470 | # same as the one already there, just skip the update entirely. This | |
1471 | # is worth optimizing because we have a lot of columns that get | |
1472 | # periodically refreshed into the database but don't actually change | |
1473 | # that often. | |
1474 | # | |
1475 | # We don't do this for read/write columns because that would break | |
1476 | # atomicity of transactions--some other client might have written a | |
1477 | # different value in that column since we read it. (But if a whole | |
1478 | # transaction only does writes of existing values, without making any | |
1479 | # real changes, we will drop the whole transaction later in | |
1480 | # ovsdb_idl_txn_commit().) | |
37520ab3 RB |
1481 | if (not column.alert and row._data and |
1482 | row._data.get(column.name) == datum): | |
8cdf0349 BP |
1483 | new_value = row._changes.get(column.name) |
1484 | if new_value is None or new_value == datum: | |
1485 | return | |
1486 | ||
1487 | txn._txn_rows[row.uuid] = row | |
330b9c9c AB |
1488 | if '_inserts' in row._mutations: |
1489 | row._mutations['_inserts'].pop(column.name, None) | |
1490 | if '_removes' in row._mutations: | |
1491 | row._mutations['_removes'].pop(column.name, None) | |
1492 | row._changes[column.name] = datum.copy() | |
8cdf0349 BP |
1493 | |
1494 | def insert(self, table, new_uuid=None): | |
1495 | """Inserts and returns a new row in 'table', which must be one of the | |
1496 | ovs.db.schema.TableSchema objects in the Idl's 'tables' dict. | |
1497 | ||
1498 | The new row is assigned a provisional UUID. If 'uuid' is None then one | |
1499 | is randomly generated; otherwise 'uuid' should specify a randomly | |
1500 | generated uuid.UUID not otherwise in use. ovsdb-server will assign a | |
1501 | different UUID when 'txn' is committed, but the IDL will replace any | |
1502 | uses of the provisional UUID in the data to be to be committed by the | |
1503 | UUID assigned by ovsdb-server.""" | |
1504 | assert self._status == Transaction.UNCOMMITTED | |
1505 | if new_uuid is None: | |
1506 | new_uuid = uuid.uuid4() | |
1507 | row = Row(self.idl, table, new_uuid, None) | |
1508 | table.rows[row.uuid] = row | |
1509 | self._txn_rows[row.uuid] = row | |
1510 | return row | |
1511 | ||
1512 | def _process_reply(self, msg): | |
1513 | if msg.type == ovs.jsonrpc.Message.T_ERROR: | |
1514 | self._status = Transaction.ERROR | |
da2d45c6 | 1515 | elif not isinstance(msg.result, (list, tuple)): |
8cdf0349 | 1516 | # XXX rate-limit |
3a656eaf | 1517 | vlog.warn('reply to "transact" is not JSON array') |
8cdf0349 BP |
1518 | else: |
1519 | hard_errors = False | |
1520 | soft_errors = False | |
1521 | lock_errors = False | |
1522 | ||
1523 | ops = msg.result | |
1524 | for op in ops: | |
1525 | if op is None: | |
1526 | # This isn't an error in itself but indicates that some | |
1527 | # prior operation failed, so make sure that we know about | |
1528 | # it. | |
1529 | soft_errors = True | |
da2d45c6 | 1530 | elif isinstance(op, dict): |
8cdf0349 BP |
1531 | error = op.get("error") |
1532 | if error is not None: | |
1533 | if error == "timed out": | |
1534 | soft_errors = True | |
1535 | elif error == "not owner": | |
1536 | lock_errors = True | |
1537 | elif error == "aborted": | |
1538 | pass | |
1539 | else: | |
1540 | hard_errors = True | |
1541 | self.__set_error_json(op) | |
1542 | else: | |
1543 | hard_errors = True | |
1544 | self.__set_error_json(op) | |
1545 | # XXX rate-limit | |
3a656eaf | 1546 | vlog.warn("operation reply is not JSON null or object") |
8cdf0349 BP |
1547 | |
1548 | if not soft_errors and not hard_errors and not lock_errors: | |
94fbe1aa | 1549 | if self._inc_row and not self.__process_inc_reply(ops): |
8cdf0349 | 1550 | hard_errors = True |
80c12152 SA |
1551 | if self._fetch_requests: |
1552 | if self.__process_fetch_reply(ops): | |
1553 | self.idl.change_seqno += 1 | |
1554 | else: | |
1555 | hard_errors = True | |
8cdf0349 | 1556 | |
cb96c1b2 | 1557 | for insert in six.itervalues(self._inserted_rows): |
8cdf0349 BP |
1558 | if not self.__process_insert_reply(insert, ops): |
1559 | hard_errors = True | |
1560 | ||
1561 | if hard_errors: | |
1562 | self._status = Transaction.ERROR | |
1563 | elif lock_errors: | |
1564 | self._status = Transaction.NOT_LOCKED | |
1565 | elif soft_errors: | |
854a94d9 | 1566 | self._status = Transaction.TRY_AGAIN |
8cdf0349 BP |
1567 | else: |
1568 | self._status = Transaction.SUCCESS | |
1569 | ||
1570 | @staticmethod | |
1571 | def __check_json_type(json, types, name): | |
1572 | if not json: | |
1573 | # XXX rate-limit | |
3a656eaf | 1574 | vlog.warn("%s is missing" % name) |
8cdf0349 | 1575 | return False |
da2d45c6 | 1576 | elif not isinstance(json, tuple(types)): |
8cdf0349 | 1577 | # XXX rate-limit |
3a656eaf | 1578 | vlog.warn("%s has unexpected type %s" % (name, type(json))) |
8cdf0349 BP |
1579 | return False |
1580 | else: | |
1581 | return True | |
1582 | ||
80c12152 SA |
1583 | def __process_fetch_reply(self, ops): |
1584 | update = False | |
1585 | for fetch_request in self._fetch_requests: | |
1586 | row = fetch_request["row"] | |
1587 | column_name = fetch_request["column_name"] | |
1588 | index = fetch_request["index"] | |
1589 | table = row._table | |
1590 | ||
1591 | select = ops[index] | |
1592 | fetched_rows = select.get("rows") | |
1593 | if not Transaction.__check_json_type(fetched_rows, (list, tuple), | |
1594 | '"select" reply "rows"'): | |
1595 | return False | |
1596 | if len(fetched_rows) != 1: | |
1597 | # XXX rate-limit | |
1598 | vlog.warn('"select" reply "rows" has %d elements ' | |
e8049bc5 | 1599 | 'instead of 1' % len(fetched_rows)) |
80c12152 SA |
1600 | continue |
1601 | fetched_row = fetched_rows[0] | |
1602 | if not Transaction.__check_json_type(fetched_row, (dict,), | |
1603 | '"select" reply row'): | |
1604 | continue | |
1605 | ||
1606 | column = table.columns.get(column_name) | |
1607 | datum_json = fetched_row.get(column_name) | |
a59912a0 | 1608 | datum = data.Datum.from_json(column.type, datum_json) |
80c12152 SA |
1609 | |
1610 | row._data[column_name] = datum | |
1611 | update = True | |
1612 | ||
1613 | return update | |
1614 | ||
8cdf0349 BP |
1615 | def __process_inc_reply(self, ops): |
1616 | if self._inc_index + 2 > len(ops): | |
1617 | # XXX rate-limit | |
3a656eaf EJ |
1618 | vlog.warn("reply does not contain enough operations for " |
1619 | "increment (has %d, needs %d)" % | |
1620 | (len(ops), self._inc_index + 2)) | |
8cdf0349 BP |
1621 | |
1622 | # We know that this is a JSON object because the loop in | |
1623 | # __process_reply() already checked. | |
1624 | mutate = ops[self._inc_index] | |
1625 | count = mutate.get("count") | |
8f808842 | 1626 | if not Transaction.__check_json_type(count, six.integer_types, |
8cdf0349 BP |
1627 | '"mutate" reply "count"'): |
1628 | return False | |
1629 | if count != 1: | |
1630 | # XXX rate-limit | |
3a656eaf | 1631 | vlog.warn('"mutate" reply "count" is %d instead of 1' % count) |
8cdf0349 BP |
1632 | return False |
1633 | ||
1634 | select = ops[self._inc_index + 1] | |
1635 | rows = select.get("rows") | |
1636 | if not Transaction.__check_json_type(rows, (list, tuple), | |
1637 | '"select" reply "rows"'): | |
1638 | return False | |
1639 | if len(rows) != 1: | |
1640 | # XXX rate-limit | |
3a656eaf EJ |
1641 | vlog.warn('"select" reply "rows" has %d elements ' |
1642 | 'instead of 1' % len(rows)) | |
8cdf0349 BP |
1643 | return False |
1644 | row = rows[0] | |
1645 | if not Transaction.__check_json_type(row, (dict,), | |
1646 | '"select" reply row'): | |
1647 | return False | |
1648 | column = row.get(self._inc_column) | |
8f808842 | 1649 | if not Transaction.__check_json_type(column, six.integer_types, |
8cdf0349 BP |
1650 | '"select" reply inc column'): |
1651 | return False | |
1652 | self._inc_new_value = column | |
1653 | return True | |
1654 | ||
1655 | def __process_insert_reply(self, insert, ops): | |
1656 | if insert.op_index >= len(ops): | |
1657 | # XXX rate-limit | |
3a656eaf EJ |
1658 | vlog.warn("reply does not contain enough operations " |
1659 | "for insert (has %d, needs %d)" | |
1660 | % (len(ops), insert.op_index)) | |
8cdf0349 BP |
1661 | return False |
1662 | ||
1663 | # We know that this is a JSON object because the loop in | |
1664 | # __process_reply() already checked. | |
1665 | reply = ops[insert.op_index] | |
1666 | json_uuid = reply.get("uuid") | |
1667 | if not Transaction.__check_json_type(json_uuid, (tuple, list), | |
1668 | '"insert" reply "uuid"'): | |
1669 | return False | |
1670 | ||
1671 | try: | |
1672 | uuid_ = ovs.ovsuuid.from_json(json_uuid) | |
1673 | except error.Error: | |
1674 | # XXX rate-limit | |
3a656eaf | 1675 | vlog.warn('"insert" reply "uuid" is not a JSON UUID') |
8cdf0349 | 1676 | return False |
bf6ec045 | 1677 | |
8cdf0349 BP |
1678 | insert.real = uuid_ |
1679 | return True | |
ad0991e6 EJ |
1680 | |
1681 | ||
1682 | class SchemaHelper(object): | |
1683 | """IDL Schema helper. | |
1684 | ||
1685 | This class encapsulates the logic required to generate schemas suitable | |
1686 | for creating 'ovs.db.idl.Idl' objects. Clients should register columns | |
1687 | they are interested in using register_columns(). When finished, the | |
1688 | get_idl_schema() function may be called. | |
1689 | ||
1690 | The location on disk of the schema used may be found in the | |
1691 | 'schema_location' variable.""" | |
1692 | ||
e15ad8e6 IY |
1693 | def __init__(self, location=None, schema_json=None): |
1694 | """Creates a new Schema object. | |
ad0991e6 | 1695 | |
e15ad8e6 IY |
1696 | 'location' file path to ovs schema. None means default location |
1697 | 'schema_json' schema in json preresentation in memory | |
1698 | """ | |
1699 | ||
1700 | if location and schema_json: | |
1701 | raise ValueError("both location and schema_json can't be " | |
1702 | "specified. it's ambiguous.") | |
1703 | if schema_json is None: | |
1704 | if location is None: | |
1705 | location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR | |
1706 | schema_json = ovs.json.from_file(location) | |
ad0991e6 | 1707 | |
e15ad8e6 | 1708 | self.schema_json = schema_json |
ad0991e6 | 1709 | self._tables = {} |
80c12152 | 1710 | self._readonly = {} |
bf42f674 | 1711 | self._all = False |
ad0991e6 | 1712 | |
80c12152 | 1713 | def register_columns(self, table, columns, readonly=[]): |
ad0991e6 EJ |
1714 | """Registers interest in the given 'columns' of 'table'. Future calls |
1715 | to get_idl_schema() will include 'table':column for each column in | |
1716 | 'columns'. This function automatically avoids adding duplicate entries | |
1717 | to the schema. | |
80c12152 SA |
1718 | A subset of 'columns' can be specified as 'readonly'. The readonly |
1719 | columns are not replicated but can be fetched on-demand by the user | |
1720 | with Row.fetch(). | |
ad0991e6 EJ |
1721 | |
1722 | 'table' must be a string. | |
1723 | 'columns' must be a list of strings. | |
80c12152 | 1724 | 'readonly' must be a list of strings. |
ad0991e6 EJ |
1725 | """ |
1726 | ||
da2d45c6 RB |
1727 | assert isinstance(table, six.string_types) |
1728 | assert isinstance(columns, list) | |
ad0991e6 EJ |
1729 | |
1730 | columns = set(columns) | self._tables.get(table, set()) | |
1731 | self._tables[table] = columns | |
80c12152 | 1732 | self._readonly[table] = readonly |
ad0991e6 | 1733 | |
7698e31d IY |
1734 | def register_table(self, table): |
1735 | """Registers interest in the given all columns of 'table'. Future calls | |
1736 | to get_idl_schema() will include all columns of 'table'. | |
1737 | ||
1738 | 'table' must be a string | |
1739 | """ | |
da2d45c6 | 1740 | assert isinstance(table, six.string_types) |
7698e31d IY |
1741 | self._tables[table] = set() # empty set means all columns in the table |
1742 | ||
bf42f674 EJ |
1743 | def register_all(self): |
1744 | """Registers interest in every column of every table.""" | |
1745 | self._all = True | |
1746 | ||
ad0991e6 EJ |
1747 | def get_idl_schema(self): |
1748 | """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL' | |
1749 | object based on columns registered using the register_columns() | |
1750 | function.""" | |
1751 | ||
e15ad8e6 IY |
1752 | schema = ovs.db.schema.DbSchema.from_json(self.schema_json) |
1753 | self.schema_json = None | |
ad0991e6 | 1754 | |
bf42f674 EJ |
1755 | if not self._all: |
1756 | schema_tables = {} | |
cb96c1b2 | 1757 | for table, columns in six.iteritems(self._tables): |
bf42f674 EJ |
1758 | schema_tables[table] = ( |
1759 | self._keep_table_columns(schema, table, columns)) | |
1760 | ||
1761 | schema.tables = schema_tables | |
80c12152 | 1762 | schema.readonly = self._readonly |
ad0991e6 EJ |
1763 | return schema |
1764 | ||
1765 | def _keep_table_columns(self, schema, table_name, columns): | |
1766 | assert table_name in schema.tables | |
1767 | table = schema.tables[table_name] | |
1768 | ||
7698e31d IY |
1769 | if not columns: |
1770 | # empty set means all columns in the table | |
1771 | return table | |
1772 | ||
ad0991e6 EJ |
1773 | new_columns = {} |
1774 | for column_name in columns: | |
da2d45c6 | 1775 | assert isinstance(column_name, six.string_types) |
ad0991e6 EJ |
1776 | assert column_name in table.columns |
1777 | ||
1778 | new_columns[column_name] = table.columns[column_name] | |
1779 | ||
1780 | table.columns = new_columns | |
1781 | return table |