]>
Commit | Line | Data |
---|---|---|
0164e367 | 1 | # Copyright (c) 2009, 2010, 2011, 2012, 2013, 2016 Nicira, Inc. |
99155935 BP |
2 | # |
3 | # Licensed under the Apache License, Version 2.0 (the "License"); | |
4 | # you may not use this file except in compliance with the License. | |
5 | # You may obtain a copy of the License at: | |
6 | # | |
7 | # http://www.apache.org/licenses/LICENSE-2.0 | |
8 | # | |
9 | # Unless required by applicable law or agreed to in writing, software | |
10 | # distributed under the License is distributed on an "AS IS" BASIS, | |
11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 | # See the License for the specific language governing permissions and | |
13 | # limitations under the License. | |
14 | ||
fbafc3c2 | 15 | import functools |
8cdf0349 | 16 | import uuid |
99155935 | 17 | |
a59912a0 | 18 | import ovs.db.data as data |
4c0f6271 | 19 | import ovs.db.parser |
99155935 | 20 | import ovs.db.schema |
6c7050b5 | 21 | import ovs.jsonrpc |
99155935 | 22 | import ovs.ovsuuid |
8cdf0349 | 23 | import ovs.poller |
3a656eaf | 24 | import ovs.vlog |
6c7050b5 | 25 | from ovs.db import error |
26 | ||
27 | import six | |
3a656eaf EJ |
28 | |
29 | vlog = ovs.vlog.Vlog("idl") | |
99155935 | 30 | |
26bb0f31 EJ |
31 | __pychecker__ = 'no-classattr no-objattrs' |
32 | ||
d7d417fc TW |
33 | ROW_CREATE = "create" |
34 | ROW_UPDATE = "update" | |
35 | ROW_DELETE = "delete" | |
26bb0f31 | 36 | |
897c8064 LS |
37 | OVSDB_UPDATE = 0 |
38 | OVSDB_UPDATE2 = 1 | |
39 | ||
d7d417fc TW |
40 | |
41 | class Idl(object): | |
99155935 BP |
42 | """Open vSwitch Database Interface Definition Language (OVSDB IDL). |
43 | ||
44 | The OVSDB IDL maintains an in-memory replica of a database. It issues RPC | |
45 | requests to an OVSDB database server and parses the responses, converting | |
46 | raw JSON into data structures that are easier for clients to digest. | |
47 | ||
48 | The IDL also assists with issuing database transactions. The client | |
49 | creates a transaction, manipulates the IDL data structures, and commits or | |
50 | aborts the transaction. The IDL then composes and issues the necessary | |
51 | JSON-RPC requests and reports to the client whether the transaction | |
52 | completed successfully. | |
53 | ||
8cdf0349 BP |
54 | The client is allowed to access the following attributes directly, in a |
55 | read-only fashion: | |
99155935 | 56 | |
8cdf0349 BP |
57 | - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided |
58 | to the Idl constructor. Each ovs.db.schema.TableSchema in the map is | |
59 | annotated with a new attribute 'rows', which is a dict from a uuid.UUID | |
60 | to a Row object. | |
61 | ||
62 | The client may directly read and write the Row objects referenced by the | |
63 | 'rows' map values. Refer to Row for more details. | |
64 | ||
65 | - 'change_seqno': A number that represents the IDL's state. When the IDL | |
2f926787 BP |
66 | is updated (by Idl.run()), its value changes. The sequence number can |
67 | occasionally change even if the database does not. This happens if the | |
68 | connection to the database drops and reconnects, which causes the | |
69 | database contents to be reloaded even if they didn't change. (It could | |
70 | also happen if the database server sends out a "change" that reflects | |
71 | what the IDL already thought was in the database. The database server is | |
72 | not supposed to do that, but bugs could in theory cause it to do so.) | |
8cdf0349 BP |
73 | |
74 | - 'lock_name': The name of the lock configured with Idl.set_lock(), or None | |
75 | if no lock is configured. | |
76 | ||
77 | - 'has_lock': True, if the IDL is configured to obtain a lock and owns that | |
78 | lock, and False otherwise. | |
79 | ||
80 | Locking and unlocking happens asynchronously from the database client's | |
81 | point of view, so the information is only useful for optimization | |
82 | (e.g. if the client doesn't have the lock then there's no point in trying | |
83 | to write to the database). | |
84 | ||
85 | - 'is_lock_contended': True, if the IDL is configured to obtain a lock but | |
86 | the database server has indicated that some other client already owns the | |
87 | requested lock, and False otherwise. | |
88 | ||
89 | - 'txn': The ovs.db.idl.Transaction object for the database transaction | |
90 | currently being constructed, if there is one, or None otherwise. | |
91 | """ | |
92 | ||
897c8064 LS |
93 | IDL_S_INITIAL = 0 |
94 | IDL_S_MONITOR_REQUESTED = 1 | |
95 | IDL_S_MONITOR_COND_REQUESTED = 2 | |
96 | ||
f73d562f | 97 | def __init__(self, remote, schema, probe_interval=None): |
99155935 BP |
98 | """Creates and returns a connection to the database named 'db_name' on |
99 | 'remote', which should be in a form acceptable to | |
100 | ovs.jsonrpc.session.open(). The connection will maintain an in-memory | |
8cdf0349 BP |
101 | replica of the remote database. |
102 | ||
103 | 'schema' should be the schema for the remote database. The caller may | |
104 | have cut it down by removing tables or columns that are not of | |
105 | interest. The IDL will only replicate the tables and columns that | |
106 | remain. The caller may also add a attribute named 'alert' to selected | |
107 | remaining columns, setting its value to False; if so, then changes to | |
108 | those columns will not be considered changes to the database for the | |
109 | purpose of the return value of Idl.run() and Idl.change_seqno. This is | |
110 | useful for columns that the IDL's client will write but not read. | |
111 | ||
ad0991e6 EJ |
112 | As a convenience to users, 'schema' may also be an instance of the |
113 | SchemaHelper class. | |
114 | ||
f73d562f LAG |
115 | The IDL uses and modifies 'schema' directly. |
116 | ||
117 | If "probe_interval" is zero it disables the connection keepalive | |
118 | feature. If non-zero the value will be forced to at least 1000 | |
119 | milliseconds. If None it will just use the default value in OVS. | |
120 | """ | |
8cdf0349 | 121 | |
bf42f674 EJ |
122 | assert isinstance(schema, SchemaHelper) |
123 | schema = schema.get_idl_schema() | |
ad0991e6 | 124 | |
8cdf0349 | 125 | self.tables = schema.tables |
80c12152 | 126 | self.readonly = schema.readonly |
8cdf0349 | 127 | self._db = schema |
f73d562f LAG |
128 | self._session = ovs.jsonrpc.Session.open(remote, |
129 | probe_interval=probe_interval) | |
8cdf0349 BP |
130 | self._monitor_request_id = None |
131 | self._last_seqno = None | |
99155935 | 132 | self.change_seqno = 0 |
897c8064 LS |
133 | self.uuid = uuid.uuid1() |
134 | self.state = self.IDL_S_INITIAL | |
8cdf0349 BP |
135 | |
136 | # Database locking. | |
137 | self.lock_name = None # Name of lock we need, None if none. | |
138 | self.has_lock = False # Has db server said we have the lock? | |
26bb0f31 | 139 | self.is_lock_contended = False # Has db server said we can't get lock? |
8cdf0349 BP |
140 | self._lock_request_id = None # JSON-RPC ID of in-flight lock request. |
141 | ||
142 | # Transaction support. | |
143 | self.txn = None | |
144 | self._outstanding_txns = {} | |
145 | ||
cb96c1b2 RB |
146 | for table in six.itervalues(schema.tables): |
147 | for column in six.itervalues(table.columns): | |
8cdf0349 BP |
148 | if not hasattr(column, 'alert'): |
149 | column.alert = True | |
150 | table.need_table = False | |
151 | table.rows = {} | |
152 | table.idl = self | |
0164e367 | 153 | table.condition = [True] |
16ebb90e | 154 | table.cond_changed = False |
99155935 BP |
155 | |
156 | def close(self): | |
8cdf0349 BP |
157 | """Closes the connection to the database. The IDL will no longer |
158 | update.""" | |
159 | self._session.close() | |
99155935 BP |
160 | |
161 | def run(self): | |
162 | """Processes a batch of messages from the database server. Returns | |
163 | True if the database as seen through the IDL changed, False if it did | |
164 | not change. The initial fetch of the entire contents of the remote | |
8cdf0349 BP |
165 | database is considered to be one kind of change. If the IDL has been |
166 | configured to acquire a database lock (with Idl.set_lock()), then | |
167 | successfully acquiring the lock is also considered to be a change. | |
99155935 BP |
168 | |
169 | This function can return occasional false positives, that is, report | |
170 | that the database changed even though it didn't. This happens if the | |
171 | connection to the database drops and reconnects, which causes the | |
172 | database contents to be reloaded even if they didn't change. (It could | |
173 | also happen if the database server sends out a "change" that reflects | |
174 | what we already thought was in the database, but the database server is | |
175 | not supposed to do that.) | |
176 | ||
177 | As an alternative to checking the return value, the client may check | |
8cdf0349 BP |
178 | for changes in self.change_seqno.""" |
179 | assert not self.txn | |
99155935 | 180 | initial_change_seqno = self.change_seqno |
16ebb90e LS |
181 | |
182 | self.send_cond_change() | |
8cdf0349 BP |
183 | self._session.run() |
184 | i = 0 | |
185 | while i < 50: | |
186 | i += 1 | |
187 | if not self._session.is_connected(): | |
188 | break | |
189 | ||
190 | seqno = self._session.get_seqno() | |
191 | if seqno != self._last_seqno: | |
192 | self._last_seqno = seqno | |
193 | self.__txn_abort_all() | |
194 | self.__send_monitor_request() | |
195 | if self.lock_name: | |
196 | self.__send_lock_request() | |
197 | break | |
198 | ||
199 | msg = self._session.recv() | |
200 | if msg is None: | |
201 | break | |
202 | if (msg.type == ovs.jsonrpc.Message.T_NOTIFY | |
897c8064 LS |
203 | and msg.method == "update2" |
204 | and len(msg.params) == 2): | |
205 | # Database contents changed. | |
206 | self.__parse_update(msg.params[1], OVSDB_UPDATE2) | |
207 | elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY | |
208 | and msg.method == "update" | |
209 | and len(msg.params) == 2): | |
8cdf0349 | 210 | # Database contents changed. |
897c8064 | 211 | self.__parse_update(msg.params[1], OVSDB_UPDATE) |
8cdf0349 BP |
212 | elif (msg.type == ovs.jsonrpc.Message.T_REPLY |
213 | and self._monitor_request_id is not None | |
214 | and self._monitor_request_id == msg.id): | |
215 | # Reply to our "monitor" request. | |
216 | try: | |
217 | self.change_seqno += 1 | |
218 | self._monitor_request_id = None | |
219 | self.__clear() | |
897c8064 LS |
220 | if self.state == self.IDL_S_MONITOR_COND_REQUESTED: |
221 | self.__parse_update(msg.result, OVSDB_UPDATE2) | |
222 | else: | |
223 | assert self.state == self.IDL_S_MONITOR_REQUESTED | |
224 | self.__parse_update(msg.result, OVSDB_UPDATE) | |
225 | ||
3ab76c56 | 226 | except error.Error as e: |
3a656eaf | 227 | vlog.err("%s: parse error in received schema: %s" |
897c8064 | 228 | % (self._session.get_name(), e)) |
8cdf0349 BP |
229 | self.__error() |
230 | elif (msg.type == ovs.jsonrpc.Message.T_REPLY | |
231 | and self._lock_request_id is not None | |
232 | and self._lock_request_id == msg.id): | |
233 | # Reply to our "lock" request. | |
234 | self.__parse_lock_reply(msg.result) | |
235 | elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY | |
236 | and msg.method == "locked"): | |
237 | # We got our lock. | |
238 | self.__parse_lock_notify(msg.params, True) | |
239 | elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY | |
240 | and msg.method == "stolen"): | |
241 | # Someone else stole our lock. | |
242 | self.__parse_lock_notify(msg.params, False) | |
243 | elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo": | |
244 | # Reply to our echo request. Ignore it. | |
245 | pass | |
897c8064 LS |
246 | elif (msg.type == ovs.jsonrpc.Message.T_ERROR and |
247 | self.state == self.IDL_S_MONITOR_COND_REQUESTED and | |
248 | self._monitor_request_id == msg.id): | |
249 | if msg.error == "unknown method": | |
250 | self.__send_monitor_request() | |
8cdf0349 BP |
251 | elif (msg.type in (ovs.jsonrpc.Message.T_ERROR, |
252 | ovs.jsonrpc.Message.T_REPLY) | |
253 | and self.__txn_process_reply(msg)): | |
254 | # __txn_process_reply() did everything needed. | |
255 | pass | |
256 | else: | |
257 | # This can happen if a transaction is destroyed before we | |
258 | # receive the reply, so keep the log level low. | |
3a656eaf EJ |
259 | vlog.dbg("%s: received unexpected %s message" |
260 | % (self._session.get_name(), | |
261 | ovs.jsonrpc.Message.type_to_string(msg.type))) | |
8cdf0349 | 262 | |
99155935 BP |
263 | return initial_change_seqno != self.change_seqno |
264 | ||
16ebb90e LS |
265 | def send_cond_change(self): |
266 | if not self._session.is_connected(): | |
267 | return | |
268 | ||
269 | for table in six.itervalues(self.tables): | |
270 | if table.cond_changed: | |
271 | self.__send_cond_change(table, table.condition) | |
272 | table.cond_changed = False | |
273 | ||
0164e367 BP |
274 | def cond_change(self, table_name, cond): |
275 | """Sets the condition for 'table_name' to 'cond', which should be a | |
276 | conditional expression suitable for use directly in the OVSDB | |
277 | protocol, with the exception that the empty condition [] | |
278 | matches no rows (instead of matching every row). That is, [] | |
279 | is equivalent to [False], not to [True]. | |
280 | """ | |
16ebb90e | 281 | |
897c8064 LS |
282 | table = self.tables.get(table_name) |
283 | if not table: | |
284 | raise error.Error('Unknown table "%s"' % table_name) | |
16ebb90e | 285 | |
0164e367 BP |
286 | if cond == []: |
287 | cond = [False] | |
288 | if table.condition != cond: | |
289 | table.condition = cond | |
290 | table.cond_changed = True | |
897c8064 | 291 | |
99155935 BP |
292 | def wait(self, poller): |
293 | """Arranges for poller.block() to wake up when self.run() has something | |
294 | to do or when activity occurs on a transaction on 'self'.""" | |
8cdf0349 BP |
295 | self._session.wait(poller) |
296 | self._session.recv_wait(poller) | |
99155935 | 297 | |
8cdf0349 BP |
298 | def has_ever_connected(self): |
299 | """Returns True, if the IDL successfully connected to the remote | |
300 | database and retrieved its contents (even if the connection | |
301 | subsequently dropped and is in the process of reconnecting). If so, | |
302 | then the IDL contains an atomic snapshot of the database's contents | |
303 | (but it might be arbitrarily old if the connection dropped). | |
304 | ||
305 | Returns False if the IDL has never connected or retrieved the | |
306 | database's contents. If so, the IDL is empty.""" | |
307 | return self.change_seqno != 0 | |
308 | ||
309 | def force_reconnect(self): | |
310 | """Forces the IDL to drop its connection to the database and reconnect. | |
311 | In the meantime, the contents of the IDL will not change.""" | |
312 | self._session.force_reconnect() | |
313 | ||
314 | def set_lock(self, lock_name): | |
315 | """If 'lock_name' is not None, configures the IDL to obtain the named | |
316 | lock from the database server and to avoid modifying the database when | |
317 | the lock cannot be acquired (that is, when another client has the same | |
318 | lock). | |
319 | ||
320 | If 'lock_name' is None, drops the locking requirement and releases the | |
321 | lock.""" | |
322 | assert not self.txn | |
323 | assert not self._outstanding_txns | |
324 | ||
325 | if self.lock_name and (not lock_name or lock_name != self.lock_name): | |
326 | # Release previous lock. | |
327 | self.__send_unlock_request() | |
328 | self.lock_name = None | |
329 | self.is_lock_contended = False | |
330 | ||
331 | if lock_name and not self.lock_name: | |
332 | # Acquire new lock. | |
333 | self.lock_name = lock_name | |
334 | self.__send_lock_request() | |
335 | ||
d7d417fc TW |
336 | def notify(self, event, row, updates=None): |
337 | """Hook for implementing create/update/delete notifications | |
338 | ||
339 | :param event: The event that was triggered | |
340 | :type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE | |
341 | :param row: The row as it is after the operation has occured | |
342 | :type row: Row | |
a7261bf7 NS |
343 | :param updates: For updates, row with only old values of the changed |
344 | columns | |
d7d417fc TW |
345 | :type updates: Row |
346 | """ | |
347 | ||
897c8064 LS |
348 | def __send_cond_change(self, table, cond): |
349 | monitor_cond_change = {table.name: [{"where": cond}]} | |
350 | old_uuid = str(self.uuid) | |
351 | self.uuid = uuid.uuid1() | |
352 | params = [old_uuid, str(self.uuid), monitor_cond_change] | |
353 | msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params) | |
354 | self._session.send(msg) | |
355 | ||
8cdf0349 BP |
356 | def __clear(self): |
357 | changed = False | |
358 | ||
cb96c1b2 | 359 | for table in six.itervalues(self.tables): |
8cdf0349 BP |
360 | if table.rows: |
361 | changed = True | |
362 | table.rows = {} | |
99155935 | 363 | |
8cdf0349 BP |
364 | if changed: |
365 | self.change_seqno += 1 | |
366 | ||
367 | def __update_has_lock(self, new_has_lock): | |
368 | if new_has_lock and not self.has_lock: | |
369 | if self._monitor_request_id is None: | |
370 | self.change_seqno += 1 | |
371 | else: | |
372 | # We're waiting for a monitor reply, so don't signal that the | |
373 | # database changed. The monitor reply will increment | |
374 | # change_seqno anyhow. | |
375 | pass | |
376 | self.is_lock_contended = False | |
377 | self.has_lock = new_has_lock | |
378 | ||
379 | def __do_send_lock_request(self, method): | |
380 | self.__update_has_lock(False) | |
381 | self._lock_request_id = None | |
382 | if self._session.is_connected(): | |
383 | msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name]) | |
384 | msg_id = msg.id | |
385 | self._session.send(msg) | |
386 | else: | |
387 | msg_id = None | |
388 | return msg_id | |
389 | ||
390 | def __send_lock_request(self): | |
391 | self._lock_request_id = self.__do_send_lock_request("lock") | |
392 | ||
393 | def __send_unlock_request(self): | |
394 | self.__do_send_lock_request("unlock") | |
395 | ||
396 | def __parse_lock_reply(self, result): | |
397 | self._lock_request_id = None | |
da2d45c6 | 398 | got_lock = isinstance(result, dict) and result.get("locked") is True |
8cdf0349 BP |
399 | self.__update_has_lock(got_lock) |
400 | if not got_lock: | |
401 | self.is_lock_contended = True | |
402 | ||
403 | def __parse_lock_notify(self, params, new_has_lock): | |
404 | if (self.lock_name is not None | |
da2d45c6 | 405 | and isinstance(params, (list, tuple)) |
8cdf0349 BP |
406 | and params |
407 | and params[0] == self.lock_name): | |
e9ad9219 | 408 | self.__update_has_lock(new_has_lock) |
8cdf0349 BP |
409 | if not new_has_lock: |
410 | self.is_lock_contended = True | |
99155935 BP |
411 | |
412 | def __send_monitor_request(self): | |
897c8064 LS |
413 | if self.state == self.IDL_S_INITIAL: |
414 | self.state = self.IDL_S_MONITOR_COND_REQUESTED | |
415 | method = "monitor_cond" | |
416 | else: | |
417 | self.state = self.IDL_S_MONITOR_REQUESTED | |
418 | method = "monitor" | |
419 | ||
99155935 | 420 | monitor_requests = {} |
cb96c1b2 | 421 | for table in six.itervalues(self.tables): |
80c12152 | 422 | columns = [] |
cb96c1b2 | 423 | for column in six.iterkeys(table.columns): |
80c12152 | 424 | if ((table.name not in self.readonly) or |
897c8064 LS |
425 | (table.name in self.readonly) and |
426 | (column not in self.readonly[table.name])): | |
80c12152 | 427 | columns.append(column) |
62bba609 | 428 | monitor_request = {"columns": columns} |
0164e367 | 429 | if method == "monitor_cond" and table.condition != [True]: |
62bba609 | 430 | monitor_request["where"] = table.condition |
16ebb90e | 431 | table.cond_change = False |
62bba609 | 432 | monitor_requests[table.name] = [monitor_request] |
897c8064 | 433 | |
99155935 | 434 | msg = ovs.jsonrpc.Message.create_request( |
897c8064 | 435 | method, [self._db.name, str(self.uuid), monitor_requests]) |
8cdf0349 BP |
436 | self._monitor_request_id = msg.id |
437 | self._session.send(msg) | |
99155935 | 438 | |
897c8064 | 439 | def __parse_update(self, update, version): |
99155935 | 440 | try: |
897c8064 | 441 | self.__do_parse_update(update, version) |
3ab76c56 | 442 | except error.Error as e: |
3a656eaf EJ |
443 | vlog.err("%s: error parsing update: %s" |
444 | % (self._session.get_name(), e)) | |
99155935 | 445 | |
897c8064 | 446 | def __do_parse_update(self, table_updates, version): |
da2d45c6 | 447 | if not isinstance(table_updates, dict): |
99155935 BP |
448 | raise error.Error("<table-updates> is not an object", |
449 | table_updates) | |
450 | ||
cb96c1b2 | 451 | for table_name, table_update in six.iteritems(table_updates): |
8cdf0349 | 452 | table = self.tables.get(table_name) |
99155935 | 453 | if not table: |
f2d8ad13 BP |
454 | raise error.Error('<table-updates> includes unknown ' |
455 | 'table "%s"' % table_name) | |
99155935 | 456 | |
da2d45c6 | 457 | if not isinstance(table_update, dict): |
f2d8ad13 BP |
458 | raise error.Error('<table-update> for table "%s" is not ' |
459 | 'an object' % table_name, table_update) | |
99155935 | 460 | |
cb96c1b2 | 461 | for uuid_string, row_update in six.iteritems(table_update): |
49c541dc | 462 | if not ovs.ovsuuid.is_valid_string(uuid_string): |
f2d8ad13 BP |
463 | raise error.Error('<table-update> for table "%s" ' |
464 | 'contains bad UUID "%s" as member ' | |
465 | 'name' % (table_name, uuid_string), | |
99155935 | 466 | table_update) |
49c541dc | 467 | uuid = ovs.ovsuuid.from_string(uuid_string) |
99155935 | 468 | |
da2d45c6 | 469 | if not isinstance(row_update, dict): |
f2d8ad13 BP |
470 | raise error.Error('<table-update> for table "%s" ' |
471 | 'contains <row-update> for %s that ' | |
472 | 'is not an object' | |
99155935 BP |
473 | % (table_name, uuid_string)) |
474 | ||
897c8064 LS |
475 | if version == OVSDB_UPDATE2: |
476 | if self.__process_update2(table, uuid, row_update): | |
477 | self.change_seqno += 1 | |
478 | continue | |
479 | ||
af1eba26 | 480 | parser = ovs.db.parser.Parser(row_update, "row-update") |
4c0f6271 BP |
481 | old = parser.get_optional("old", [dict]) |
482 | new = parser.get_optional("new", [dict]) | |
483 | parser.finish() | |
99155935 | 484 | |
99155935 | 485 | if not old and not new: |
f2d8ad13 BP |
486 | raise error.Error('<row-update> missing "old" and ' |
487 | '"new" members', row_update) | |
99155935 | 488 | |
8cdf0349 | 489 | if self.__process_update(table, uuid, old, new): |
99155935 BP |
490 | self.change_seqno += 1 |
491 | ||
897c8064 LS |
492 | def __process_update2(self, table, uuid, row_update): |
493 | row = table.rows.get(uuid) | |
494 | changed = False | |
495 | if "delete" in row_update: | |
496 | if row: | |
497 | del table.rows[uuid] | |
498 | self.notify(ROW_DELETE, row) | |
499 | changed = True | |
500 | else: | |
501 | # XXX rate-limit | |
502 | vlog.warn("cannot delete missing row %s from table" | |
503 | "%s" % (uuid, table.name)) | |
504 | elif "insert" in row_update or "initial" in row_update: | |
505 | if row: | |
506 | vlog.warn("cannot add existing row %s from table" | |
507 | " %s" % (uuid, table.name)) | |
508 | del table.rows[uuid] | |
509 | row = self.__create_row(table, uuid) | |
510 | if "insert" in row_update: | |
511 | row_update = row_update['insert'] | |
512 | else: | |
513 | row_update = row_update['initial'] | |
514 | self.__add_default(table, row_update) | |
515 | if self.__row_update(table, row, row_update): | |
516 | changed = True | |
517 | self.notify(ROW_CREATE, row) | |
518 | elif "modify" in row_update: | |
519 | if not row: | |
520 | raise error.Error('Modify non-existing row') | |
521 | ||
eab13876 DA |
522 | old_row = self.__apply_diff(table, row, row_update['modify']) |
523 | self.notify(ROW_UPDATE, row, Row(self, table, uuid, old_row)) | |
897c8064 LS |
524 | changed = True |
525 | else: | |
526 | raise error.Error('<row-update> unknown operation', | |
527 | row_update) | |
528 | return changed | |
529 | ||
8cdf0349 | 530 | def __process_update(self, table, uuid, old, new): |
99155935 | 531 | """Returns True if a column changed, False otherwise.""" |
8cdf0349 | 532 | row = table.rows.get(uuid) |
7d48f8f8 | 533 | changed = False |
99155935 BP |
534 | if not new: |
535 | # Delete row. | |
536 | if row: | |
8cdf0349 | 537 | del table.rows[uuid] |
7d48f8f8 | 538 | changed = True |
d7d417fc | 539 | self.notify(ROW_DELETE, row) |
99155935 BP |
540 | else: |
541 | # XXX rate-limit | |
3a656eaf EJ |
542 | vlog.warn("cannot delete missing row %s from table %s" |
543 | % (uuid, table.name)) | |
99155935 BP |
544 | elif not old: |
545 | # Insert row. | |
546 | if not row: | |
547 | row = self.__create_row(table, uuid) | |
7d48f8f8 | 548 | changed = True |
99155935 BP |
549 | else: |
550 | # XXX rate-limit | |
3a656eaf EJ |
551 | vlog.warn("cannot add existing row %s to table %s" |
552 | % (uuid, table.name)) | |
8cdf0349 | 553 | if self.__row_update(table, row, new): |
7d48f8f8 | 554 | changed = True |
d7d417fc | 555 | self.notify(ROW_CREATE, row) |
99155935 | 556 | else: |
d7d417fc | 557 | op = ROW_UPDATE |
99155935 BP |
558 | if not row: |
559 | row = self.__create_row(table, uuid) | |
7d48f8f8 | 560 | changed = True |
d7d417fc | 561 | op = ROW_CREATE |
99155935 | 562 | # XXX rate-limit |
3a656eaf EJ |
563 | vlog.warn("cannot modify missing row %s in table %s" |
564 | % (uuid, table.name)) | |
8cdf0349 | 565 | if self.__row_update(table, row, new): |
7d48f8f8 | 566 | changed = True |
d7d417fc | 567 | self.notify(op, row, Row.from_json(self, table, uuid, old)) |
7d48f8f8 | 568 | return changed |
99155935 | 569 | |
897c8064 LS |
570 | def __column_name(self, column): |
571 | if column.type.key.type == ovs.db.types.UuidType: | |
572 | return ovs.ovsuuid.to_json(column.type.key.type.default) | |
573 | else: | |
574 | return column.type.key.type.default | |
575 | ||
576 | def __add_default(self, table, row_update): | |
577 | for column in six.itervalues(table.columns): | |
578 | if column.name not in row_update: | |
579 | if ((table.name not in self.readonly) or | |
580 | (table.name in self.readonly) and | |
581 | (column.name not in self.readonly[table.name])): | |
582 | if column.type.n_min != 0 and not column.type.is_map(): | |
583 | row_update[column.name] = self.__column_name(column) | |
584 | ||
585 | def __apply_diff(self, table, row, row_diff): | |
eab13876 | 586 | old_row = {} |
a7261bf7 | 587 | for column_name, datum_diff_json in six.iteritems(row_diff): |
897c8064 LS |
588 | column = table.columns.get(column_name) |
589 | if not column: | |
590 | # XXX rate-limit | |
591 | vlog.warn("unknown column %s updating table %s" | |
592 | % (column_name, table.name)) | |
593 | continue | |
594 | ||
595 | try: | |
a59912a0 | 596 | datum_diff = data.Datum.from_json(column.type, datum_diff_json) |
897c8064 LS |
597 | except error.Error as e: |
598 | # XXX rate-limit | |
599 | vlog.warn("error parsing column %s in table %s: %s" | |
600 | % (column_name, table.name, e)) | |
601 | continue | |
602 | ||
eab13876 | 603 | old_row[column_name] = row._data[column_name].copy() |
a7261bf7 | 604 | datum = row._data[column_name].diff(datum_diff) |
897c8064 LS |
605 | if datum != row._data[column_name]: |
606 | row._data[column_name] = datum | |
607 | ||
eab13876 | 608 | return old_row |
a7261bf7 | 609 | |
8cdf0349 | 610 | def __row_update(self, table, row, row_json): |
99155935 | 611 | changed = False |
cb96c1b2 | 612 | for column_name, datum_json in six.iteritems(row_json): |
99155935 BP |
613 | column = table.columns.get(column_name) |
614 | if not column: | |
615 | # XXX rate-limit | |
3a656eaf EJ |
616 | vlog.warn("unknown column %s updating table %s" |
617 | % (column_name, table.name)) | |
99155935 BP |
618 | continue |
619 | ||
620 | try: | |
a59912a0 | 621 | datum = data.Datum.from_json(column.type, datum_json) |
3ab76c56 | 622 | except error.Error as e: |
99155935 | 623 | # XXX rate-limit |
3a656eaf EJ |
624 | vlog.warn("error parsing column %s in table %s: %s" |
625 | % (column_name, table.name, e)) | |
99155935 BP |
626 | continue |
627 | ||
8cdf0349 BP |
628 | if datum != row._data[column_name]: |
629 | row._data[column_name] = datum | |
630 | if column.alert: | |
631 | changed = True | |
99155935 BP |
632 | else: |
633 | # Didn't really change but the OVSDB monitor protocol always | |
634 | # includes every value in a row. | |
635 | pass | |
636 | return changed | |
637 | ||
99155935 | 638 | def __create_row(self, table, uuid): |
8cdf0349 | 639 | data = {} |
cb96c1b2 | 640 | for column in six.itervalues(table.columns): |
8cdf0349 BP |
641 | data[column.name] = ovs.db.data.Datum.default(column.type) |
642 | row = table.rows[uuid] = Row(self, table, uuid, data) | |
99155935 BP |
643 | return row |
644 | ||
8cdf0349 BP |
645 | def __error(self): |
646 | self._session.force_reconnect() | |
647 | ||
648 | def __txn_abort_all(self): | |
649 | while self._outstanding_txns: | |
650 | txn = self._outstanding_txns.popitem()[1] | |
854a94d9 | 651 | txn._status = Transaction.TRY_AGAIN |
8cdf0349 BP |
652 | |
653 | def __txn_process_reply(self, msg): | |
654 | txn = self._outstanding_txns.pop(msg.id, None) | |
655 | if txn: | |
656 | txn._process_reply(msg) | |
beba3d82 | 657 | return True |
8cdf0349 | 658 | |
26bb0f31 | 659 | |
8cdf0349 BP |
660 | def _uuid_to_row(atom, base): |
661 | if base.ref_table: | |
662 | return base.ref_table.rows.get(atom) | |
663 | else: | |
664 | return atom | |
665 | ||
26bb0f31 | 666 | |
8cdf0349 | 667 | def _row_to_uuid(value): |
da2d45c6 | 668 | if isinstance(value, Row): |
8cdf0349 BP |
669 | return value.uuid |
670 | else: | |
671 | return value | |
bf6ec045 | 672 | |
26bb0f31 | 673 | |
fbafc3c2 | 674 | @functools.total_ordering |
bf6ec045 | 675 | class Row(object): |
8cdf0349 BP |
676 | """A row within an IDL. |
677 | ||
678 | The client may access the following attributes directly: | |
679 | ||
680 | - 'uuid': a uuid.UUID object whose value is the row's database UUID. | |
681 | ||
682 | - An attribute for each column in the Row's table, named for the column, | |
683 | whose values are as returned by Datum.to_python() for the column's type. | |
684 | ||
685 | If some error occurs (e.g. the database server's idea of the column is | |
686 | different from the IDL's idea), then the attribute values is the | |
687 | "default" value return by Datum.default() for the column's type. (It is | |
688 | important to know this because the default value may violate constraints | |
689 | for the column's type, e.g. the default integer value is 0 even if column | |
690 | contraints require the column's value to be positive.) | |
691 | ||
692 | When a transaction is active, column attributes may also be assigned new | |
693 | values. Committing the transaction will then cause the new value to be | |
694 | stored into the database. | |
695 | ||
696 | *NOTE*: In the current implementation, the value of a column is a *copy* | |
697 | of the value in the database. This means that modifying its value | |
698 | directly will have no useful effect. For example, the following: | |
699 | row.mycolumn["a"] = "b" # don't do this | |
700 | will not change anything in the database, even after commit. To modify | |
701 | the column, instead assign the modified column value back to the column: | |
702 | d = row.mycolumn | |
703 | d["a"] = "b" | |
704 | row.mycolumn = d | |
705 | """ | |
706 | def __init__(self, idl, table, uuid, data): | |
707 | # All of the explicit references to self.__dict__ below are required | |
708 | # to set real attributes with invoking self.__getattr__(). | |
709 | self.__dict__["uuid"] = uuid | |
710 | ||
711 | self.__dict__["_idl"] = idl | |
712 | self.__dict__["_table"] = table | |
713 | ||
714 | # _data is the committed data. It takes the following values: | |
715 | # | |
716 | # - A dictionary that maps every column name to a Datum, if the row | |
717 | # exists in the committed form of the database. | |
718 | # | |
719 | # - None, if this row is newly inserted within the active transaction | |
720 | # and thus has no committed form. | |
721 | self.__dict__["_data"] = data | |
722 | ||
723 | # _changes describes changes to this row within the active transaction. | |
724 | # It takes the following values: | |
725 | # | |
726 | # - {}, the empty dictionary, if no transaction is active or if the | |
727 | # row has yet not been changed within this transaction. | |
728 | # | |
729 | # - A dictionary that maps a column name to its new Datum, if an | |
730 | # active transaction changes those columns' values. | |
731 | # | |
732 | # - A dictionary that maps every column name to a Datum, if the row | |
733 | # is newly inserted within the active transaction. | |
734 | # | |
735 | # - None, if this transaction deletes this row. | |
736 | self.__dict__["_changes"] = {} | |
737 | ||
a59912a0 RM |
738 | # _mutations describes changes to this row to be handled via a |
739 | # mutate operation on the wire. It takes the following values: | |
740 | # | |
741 | # - {}, the empty dictionary, if no transaction is active or if the | |
742 | # row has yet not been mutated within this transaction. | |
743 | # | |
744 | # - A dictionary that contains two keys: | |
745 | # | |
746 | # - "_inserts" contains a dictionary that maps column names to | |
747 | # new keys/key-value pairs that should be inserted into the | |
748 | # column | |
749 | # - "_removes" contains a dictionary that maps column names to | |
750 | # the keys/key-value pairs that should be removed from the | |
751 | # column | |
752 | # | |
753 | # - None, if this transaction deletes this row. | |
754 | self.__dict__["_mutations"] = {} | |
755 | ||
8cdf0349 BP |
756 | # A dictionary whose keys are the names of columns that must be |
757 | # verified as prerequisites when the transaction commits. The values | |
758 | # in the dictionary are all None. | |
759 | self.__dict__["_prereqs"] = {} | |
760 | ||
fbafc3c2 RB |
761 | def __lt__(self, other): |
762 | if not isinstance(other, Row): | |
763 | return NotImplemented | |
764 | return bool(self.__dict__['uuid'] < other.__dict__['uuid']) | |
765 | ||
766 | def __eq__(self, other): | |
767 | if not isinstance(other, Row): | |
768 | return NotImplemented | |
769 | return bool(self.__dict__['uuid'] == other.__dict__['uuid']) | |
770 | ||
771 | def __hash__(self): | |
772 | return int(self.__dict__['uuid']) | |
773 | ||
8cdf0349 BP |
774 | def __getattr__(self, column_name): |
775 | assert self._changes is not None | |
a59912a0 | 776 | assert self._mutations is not None |
8cdf0349 | 777 | |
685e6983 TR |
778 | try: |
779 | column = self._table.columns[column_name] | |
780 | except KeyError: | |
781 | raise AttributeError("%s instance has no attribute '%s'" % | |
782 | (self.__class__.__name__, column_name)) | |
8cdf0349 | 783 | datum = self._changes.get(column_name) |
a59912a0 RM |
784 | inserts = None |
785 | if '_inserts' in self._mutations.keys(): | |
786 | inserts = self._mutations['_inserts'].get(column_name) | |
787 | removes = None | |
788 | if '_removes' in self._mutations.keys(): | |
789 | removes = self._mutations['_removes'].get(column_name) | |
8cdf0349 | 790 | if datum is None: |
3b4c362f | 791 | if self._data is None: |
a59912a0 RM |
792 | if inserts is None: |
793 | raise AttributeError("%s instance has no attribute '%s'" % | |
794 | (self.__class__.__name__, | |
795 | column_name)) | |
796 | else: | |
2d54d801 AB |
797 | datum = data.Datum.from_python(column.type, |
798 | inserts, | |
799 | _row_to_uuid) | |
800 | elif column_name in self._data: | |
80c12152 | 801 | datum = self._data[column_name] |
2d54d801 AB |
802 | if column.type.is_set(): |
803 | dlist = datum.as_list() | |
a59912a0 | 804 | if inserts is not None: |
2d54d801 | 805 | dlist.extend(list(inserts)) |
a59912a0 | 806 | if removes is not None: |
2d54d801 AB |
807 | removes_datum = data.Datum.from_python(column.type, |
808 | removes, | |
809 | _row_to_uuid) | |
810 | removes_list = removes_datum.as_list() | |
811 | dlist = [x for x in dlist if x not in removes_list] | |
812 | datum = data.Datum.from_python(column.type, dlist, | |
813 | _row_to_uuid) | |
814 | elif column.type.is_map(): | |
815 | dmap = datum.to_python(_uuid_to_row) | |
a59912a0 | 816 | if inserts is not None: |
2d54d801 | 817 | dmap.update(inserts) |
a59912a0 | 818 | if removes is not None: |
2d54d801 AB |
819 | for key in removes: |
820 | if key not in (inserts or {}): | |
821 | del dmap[key] | |
822 | datum = data.Datum.from_python(column.type, dmap, | |
823 | _row_to_uuid) | |
80c12152 | 824 | else: |
a59912a0 RM |
825 | if inserts is None: |
826 | raise AttributeError("%s instance has no attribute '%s'" % | |
827 | (self.__class__.__name__, | |
828 | column_name)) | |
829 | else: | |
830 | datum = inserts | |
8cdf0349 BP |
831 | |
832 | return datum.to_python(_uuid_to_row) | |
833 | ||
834 | def __setattr__(self, column_name, value): | |
835 | assert self._changes is not None | |
836 | assert self._idl.txn | |
837 | ||
80c12152 | 838 | if ((self._table.name in self._idl.readonly) and |
897c8064 | 839 | (column_name in self._idl.readonly[self._table.name])): |
37520ab3 RB |
840 | vlog.warn("attempting to write to readonly column %s" |
841 | % column_name) | |
80c12152 SA |
842 | return |
843 | ||
8cdf0349 BP |
844 | column = self._table.columns[column_name] |
845 | try: | |
a59912a0 | 846 | datum = data.Datum.from_python(column.type, value, _row_to_uuid) |
3ab76c56 | 847 | except error.Error as e: |
8cdf0349 | 848 | # XXX rate-limit |
3a656eaf EJ |
849 | vlog.err("attempting to write bad value to column %s (%s)" |
850 | % (column_name, e)) | |
8cdf0349 BP |
851 | return |
852 | self._idl.txn._write(self, column, datum) | |
853 | ||
a59912a0 RM |
854 | def addvalue(self, column_name, key): |
855 | self._idl.txn._txn_rows[self.uuid] = self | |
330b9c9c AB |
856 | column = self._table.columns[column_name] |
857 | try: | |
858 | data.Datum.from_python(column.type, key, _row_to_uuid) | |
859 | except error.Error as e: | |
860 | # XXX rate-limit | |
861 | vlog.err("attempting to write bad value to column %s (%s)" | |
862 | % (column_name, e)) | |
863 | return | |
864 | inserts = self._mutations.setdefault('_inserts', {}) | |
865 | column_value = inserts.setdefault(column_name, set()) | |
866 | column_value.add(key) | |
a59912a0 RM |
867 | |
868 | def delvalue(self, column_name, key): | |
869 | self._idl.txn._txn_rows[self.uuid] = self | |
330b9c9c AB |
870 | column = self._table.columns[column_name] |
871 | try: | |
872 | data.Datum.from_python(column.type, key, _row_to_uuid) | |
873 | except error.Error as e: | |
874 | # XXX rate-limit | |
875 | vlog.err("attempting to delete bad value from column %s (%s)" | |
876 | % (column_name, e)) | |
877 | return | |
878 | removes = self._mutations.setdefault('_removes', {}) | |
879 | column_value = removes.setdefault(column_name, set()) | |
880 | column_value.add(key) | |
a59912a0 RM |
881 | |
882 | def setkey(self, column_name, key, value): | |
883 | self._idl.txn._txn_rows[self.uuid] = self | |
884 | column = self._table.columns[column_name] | |
885 | try: | |
330b9c9c | 886 | data.Datum.from_python(column.type, {key: value}, _row_to_uuid) |
a59912a0 RM |
887 | except error.Error as e: |
888 | # XXX rate-limit | |
889 | vlog.err("attempting to write bad value to column %s (%s)" | |
890 | % (column_name, e)) | |
891 | return | |
2d54d801 | 892 | if self._data and column_name in self._data: |
330b9c9c AB |
893 | # Remove existing key/value before updating. |
894 | removes = self._mutations.setdefault('_removes', {}) | |
895 | column_value = removes.setdefault(column_name, set()) | |
896 | column_value.add(key) | |
897 | inserts = self._mutations.setdefault('_inserts', {}) | |
898 | column_value = inserts.setdefault(column_name, {}) | |
899 | column_value[key] = value | |
900 | ||
901 | def delkey(self, column_name, key, value=None): | |
a59912a0 | 902 | self._idl.txn._txn_rows[self.uuid] = self |
330b9c9c AB |
903 | if value: |
904 | try: | |
905 | old_value = data.Datum.to_python(self._data[column_name], | |
906 | _uuid_to_row) | |
907 | except error.Error: | |
908 | return | |
909 | if key not in old_value: | |
910 | return | |
911 | if old_value[key] != value: | |
912 | return | |
913 | removes = self._mutations.setdefault('_removes', {}) | |
914 | column_value = removes.setdefault(column_name, set()) | |
915 | column_value.add(key) | |
a59912a0 RM |
916 | return |
917 | ||
d7d417fc TW |
918 | @classmethod |
919 | def from_json(cls, idl, table, uuid, row_json): | |
920 | data = {} | |
cb96c1b2 | 921 | for column_name, datum_json in six.iteritems(row_json): |
d7d417fc TW |
922 | column = table.columns.get(column_name) |
923 | if not column: | |
924 | # XXX rate-limit | |
925 | vlog.warn("unknown column %s in table %s" | |
926 | % (column_name, table.name)) | |
927 | continue | |
928 | try: | |
929 | datum = ovs.db.data.Datum.from_json(column.type, datum_json) | |
3ab76c56 | 930 | except error.Error as e: |
d7d417fc TW |
931 | # XXX rate-limit |
932 | vlog.warn("error parsing column %s in table %s: %s" | |
933 | % (column_name, table.name, e)) | |
934 | continue | |
935 | data[column_name] = datum | |
936 | return cls(idl, table, uuid, data) | |
937 | ||
8cdf0349 BP |
938 | def verify(self, column_name): |
939 | """Causes the original contents of column 'column_name' in this row to | |
940 | be verified as a prerequisite to completing the transaction. That is, | |
941 | if 'column_name' changed in this row (or if this row was deleted) | |
942 | between the time that the IDL originally read its contents and the time | |
943 | that the transaction commits, then the transaction aborts and | |
854a94d9 | 944 | Transaction.commit() returns Transaction.TRY_AGAIN. |
8cdf0349 BP |
945 | |
946 | The intention is that, to ensure that no transaction commits based on | |
947 | dirty reads, an application should call Row.verify() on each data item | |
948 | read as part of a read-modify-write operation. | |
949 | ||
950 | In some cases Row.verify() reduces to a no-op, because the current | |
951 | value of the column is already known: | |
952 | ||
953 | - If this row is a row created by the current transaction (returned | |
954 | by Transaction.insert()). | |
955 | ||
956 | - If the column has already been modified within the current | |
957 | transaction. | |
958 | ||
959 | Because of the latter property, always call Row.verify() *before* | |
960 | modifying the column, for a given read-modify-write. | |
961 | ||
962 | A transaction must be in progress.""" | |
963 | assert self._idl.txn | |
964 | assert self._changes is not None | |
965 | if not self._data or column_name in self._changes: | |
966 | return | |
967 | ||
968 | self._prereqs[column_name] = None | |
969 | ||
970 | def delete(self): | |
971 | """Deletes this row from its table. | |
972 | ||
973 | A transaction must be in progress.""" | |
974 | assert self._idl.txn | |
975 | assert self._changes is not None | |
976 | if self._data is None: | |
977 | del self._idl.txn._txn_rows[self.uuid] | |
8d3efc1c BP |
978 | else: |
979 | self._idl.txn._txn_rows[self.uuid] = self | |
8cdf0349 BP |
980 | self.__dict__["_changes"] = None |
981 | del self._table.rows[self.uuid] | |
982 | ||
80c12152 SA |
983 | def fetch(self, column_name): |
984 | self._idl.txn._fetch(self, column_name) | |
985 | ||
94fbe1aa BP |
986 | def increment(self, column_name): |
987 | """Causes the transaction, when committed, to increment the value of | |
988 | 'column_name' within this row by 1. 'column_name' must have an integer | |
989 | type. After the transaction commits successfully, the client may | |
990 | retrieve the final (incremented) value of 'column_name' with | |
991 | Transaction.get_increment_new_value(). | |
992 | ||
993 | The client could accomplish something similar by reading and writing | |
994 | and verify()ing columns. However, increment() will never (by itself) | |
995 | cause a transaction to fail because of a verify error. | |
996 | ||
997 | The intended use is for incrementing the "next_cfg" column in | |
998 | the Open_vSwitch table.""" | |
999 | self._idl.txn._increment(self, column_name) | |
1000 | ||
26bb0f31 | 1001 | |
8cdf0349 BP |
1002 | def _uuid_name_from_uuid(uuid): |
1003 | return "row%s" % str(uuid).replace("-", "_") | |
1004 | ||
26bb0f31 | 1005 | |
8cdf0349 BP |
1006 | def _where_uuid_equals(uuid): |
1007 | return [["_uuid", "==", ["uuid", str(uuid)]]] | |
1008 | ||
26bb0f31 | 1009 | |
8cdf0349 BP |
1010 | class _InsertedRow(object): |
1011 | def __init__(self, op_index): | |
1012 | self.op_index = op_index | |
1013 | self.real = None | |
1014 | ||
26bb0f31 | 1015 | |
8cdf0349 | 1016 | class Transaction(object): |
2f926787 BP |
1017 | """A transaction may modify the contents of a database by modifying the |
1018 | values of columns, deleting rows, inserting rows, or adding checks that | |
1019 | columns in the database have not changed ("verify" operations), through | |
1020 | Row methods. | |
1021 | ||
1022 | Reading and writing columns and inserting and deleting rows are all | |
1023 | straightforward. The reasons to verify columns are less obvious. | |
1024 | Verification is the key to maintaining transactional integrity. Because | |
1025 | OVSDB handles multiple clients, it can happen that between the time that | |
1026 | OVSDB client A reads a column and writes a new value, OVSDB client B has | |
1027 | written that column. Client A's write should not ordinarily overwrite | |
1028 | client B's, especially if the column in question is a "map" column that | |
1029 | contains several more or less independent data items. If client A adds a | |
1030 | "verify" operation before it writes the column, then the transaction fails | |
1031 | in case client B modifies it first. Client A will then see the new value | |
1032 | of the column and compose a new transaction based on the new contents | |
1033 | written by client B. | |
1034 | ||
1035 | When a transaction is complete, which must be before the next call to | |
1036 | Idl.run(), call Transaction.commit() or Transaction.abort(). | |
1037 | ||
1038 | The life-cycle of a transaction looks like this: | |
1039 | ||
1040 | 1. Create the transaction and record the initial sequence number: | |
1041 | ||
1042 | seqno = idl.change_seqno(idl) | |
1043 | txn = Transaction(idl) | |
1044 | ||
1045 | 2. Modify the database with Row and Transaction methods. | |
1046 | ||
1047 | 3. Commit the transaction by calling Transaction.commit(). The first call | |
1048 | to this function probably returns Transaction.INCOMPLETE. The client | |
1049 | must keep calling again along as this remains true, calling Idl.run() in | |
1050 | between to let the IDL do protocol processing. (If the client doesn't | |
1051 | have anything else to do in the meantime, it can use | |
1052 | Transaction.commit_block() to avoid having to loop itself.) | |
1053 | ||
1054 | 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno | |
1055 | to change from the saved 'seqno' (it's possible that it's already | |
1056 | changed, in which case the client should not wait at all), then start | |
1057 | over from step 1. Only a call to Idl.run() will change the return value | |
1058 | of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)""" | |
1059 | ||
8cdf0349 | 1060 | # Status values that Transaction.commit() can return. |
eda26d40 RB |
1061 | |
1062 | # Not yet committed or aborted. | |
1063 | UNCOMMITTED = "uncommitted" | |
1064 | # Transaction didn't include any changes. | |
1065 | UNCHANGED = "unchanged" | |
1066 | # Commit in progress, please wait. | |
1067 | INCOMPLETE = "incomplete" | |
1068 | # ovsdb_idl_txn_abort() called. | |
1069 | ABORTED = "aborted" | |
1070 | # Commit successful. | |
1071 | SUCCESS = "success" | |
1072 | # Commit failed because a "verify" operation | |
1073 | # reported an inconsistency, due to a network | |
1074 | # problem, or other transient failure. Wait | |
1075 | # for a change, then try again. | |
1076 | TRY_AGAIN = "try again" | |
1077 | # Server hasn't given us the lock yet. | |
1078 | NOT_LOCKED = "not locked" | |
1079 | # Commit failed due to a hard error. | |
1080 | ERROR = "error" | |
8cdf0349 BP |
1081 | |
1082 | @staticmethod | |
1083 | def status_to_string(status): | |
1084 | """Converts one of the status values that Transaction.commit() can | |
1085 | return into a human-readable string. | |
1086 | ||
1087 | (The status values are in fact such strings already, so | |
1088 | there's nothing to do.)""" | |
1089 | return status | |
1090 | ||
1091 | def __init__(self, idl): | |
1092 | """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl). | |
1093 | A given Idl may only have a single active transaction at a time. | |
1094 | ||
1095 | A Transaction may modify the contents of a database by assigning new | |
1096 | values to columns (attributes of Row), deleting rows (with | |
1097 | Row.delete()), or inserting rows (with Transaction.insert()). It may | |
1098 | also check that columns in the database have not changed with | |
1099 | Row.verify(). | |
1100 | ||
1101 | When a transaction is complete (which must be before the next call to | |
1102 | Idl.run()), call Transaction.commit() or Transaction.abort().""" | |
1103 | assert idl.txn is None | |
1104 | ||
1105 | idl.txn = self | |
1106 | self._request_id = None | |
1107 | self.idl = idl | |
1108 | self.dry_run = False | |
1109 | self._txn_rows = {} | |
1110 | self._status = Transaction.UNCOMMITTED | |
1111 | self._error = None | |
1112 | self._comments = [] | |
1113 | ||
94fbe1aa | 1114 | self._inc_row = None |
8cdf0349 | 1115 | self._inc_column = None |
8cdf0349 | 1116 | |
80c12152 SA |
1117 | self._fetch_requests = [] |
1118 | ||
26bb0f31 | 1119 | self._inserted_rows = {} # Map from UUID to _InsertedRow |
8cdf0349 BP |
1120 | |
1121 | def add_comment(self, comment): | |
80c12152 | 1122 | """Appends 'comment' to the comments that will be passed to the OVSDB |
8cdf0349 BP |
1123 | server when this transaction is committed. (The comment will be |
1124 | committed to the OVSDB log, which "ovsdb-tool show-log" can print in a | |
1125 | relatively human-readable form.)""" | |
1126 | self._comments.append(comment) | |
1127 | ||
8cdf0349 | 1128 | def wait(self, poller): |
2f926787 BP |
1129 | """Causes poll_block() to wake up if this transaction has completed |
1130 | committing.""" | |
8cdf0349 BP |
1131 | if self._status not in (Transaction.UNCOMMITTED, |
1132 | Transaction.INCOMPLETE): | |
1133 | poller.immediate_wake() | |
1134 | ||
1135 | def _substitute_uuids(self, json): | |
da2d45c6 | 1136 | if isinstance(json, (list, tuple)): |
8cdf0349 | 1137 | if (len(json) == 2 |
897c8064 LS |
1138 | and json[0] == 'uuid' |
1139 | and ovs.ovsuuid.is_valid_string(json[1])): | |
8cdf0349 BP |
1140 | uuid = ovs.ovsuuid.from_string(json[1]) |
1141 | row = self._txn_rows.get(uuid, None) | |
1142 | if row and row._data is None: | |
1143 | return ["named-uuid", _uuid_name_from_uuid(uuid)] | |
225b582a IY |
1144 | else: |
1145 | return [self._substitute_uuids(elem) for elem in json] | |
8cdf0349 BP |
1146 | return json |
1147 | ||
1148 | def __disassemble(self): | |
1149 | self.idl.txn = None | |
1150 | ||
cb96c1b2 | 1151 | for row in six.itervalues(self._txn_rows): |
8cdf0349 BP |
1152 | if row._changes is None: |
1153 | row._table.rows[row.uuid] = row | |
1154 | elif row._data is None: | |
1155 | del row._table.rows[row.uuid] | |
1156 | row.__dict__["_changes"] = {} | |
a59912a0 | 1157 | row.__dict__["_mutations"] = {} |
8cdf0349 BP |
1158 | row.__dict__["_prereqs"] = {} |
1159 | self._txn_rows = {} | |
1160 | ||
1161 | def commit(self): | |
2f926787 BP |
1162 | """Attempts to commit 'txn'. Returns the status of the commit |
1163 | operation, one of the following constants: | |
1164 | ||
1165 | Transaction.INCOMPLETE: | |
1166 | ||
1167 | The transaction is in progress, but not yet complete. The caller | |
1168 | should call again later, after calling Idl.run() to let the | |
1169 | IDL do OVSDB protocol processing. | |
1170 | ||
1171 | Transaction.UNCHANGED: | |
1172 | ||
1173 | The transaction is complete. (It didn't actually change the | |
1174 | database, so the IDL didn't send any request to the database | |
1175 | server.) | |
1176 | ||
1177 | Transaction.ABORTED: | |
1178 | ||
1179 | The caller previously called Transaction.abort(). | |
1180 | ||
1181 | Transaction.SUCCESS: | |
1182 | ||
1183 | The transaction was successful. The update made by the | |
1184 | transaction (and possibly other changes made by other database | |
1185 | clients) should already be visible in the IDL. | |
1186 | ||
1187 | Transaction.TRY_AGAIN: | |
1188 | ||
1189 | The transaction failed for some transient reason, e.g. because a | |
1190 | "verify" operation reported an inconsistency or due to a network | |
1191 | problem. The caller should wait for a change to the database, | |
1192 | then compose a new transaction, and commit the new transaction. | |
1193 | ||
1194 | Use Idl.change_seqno to wait for a change in the database. It is | |
1195 | important to use its value *before* the initial call to | |
1196 | Transaction.commit() as the baseline for this purpose, because | |
1197 | the change that one should wait for can happen after the initial | |
1198 | call but before the call that returns Transaction.TRY_AGAIN, and | |
1199 | using some other baseline value in that situation could cause an | |
1200 | indefinite wait if the database rarely changes. | |
1201 | ||
1202 | Transaction.NOT_LOCKED: | |
1203 | ||
1204 | The transaction failed because the IDL has been configured to | |
1205 | require a database lock (with Idl.set_lock()) but didn't | |
1206 | get it yet or has already lost it. | |
8cdf0349 BP |
1207 | |
1208 | Committing a transaction rolls back all of the changes that it made to | |
2f926787 | 1209 | the IDL's copy of the database. If the transaction commits |
8cdf0349 | 1210 | successfully, then the database server will send an update and, thus, |
2f926787 | 1211 | the IDL will be updated with the committed changes.""" |
8cdf0349 BP |
1212 | # The status can only change if we're the active transaction. |
1213 | # (Otherwise, our status will change only in Idl.run().) | |
1214 | if self != self.idl.txn: | |
1215 | return self._status | |
1216 | ||
1217 | # If we need a lock but don't have it, give up quickly. | |
9614403d | 1218 | if self.idl.lock_name and not self.idl.has_lock: |
8cdf0349 BP |
1219 | self._status = Transaction.NOT_LOCKED |
1220 | self.__disassemble() | |
1221 | return self._status | |
1222 | ||
1223 | operations = [self.idl._db.name] | |
1224 | ||
1225 | # Assert that we have the required lock (avoiding a race). | |
1226 | if self.idl.lock_name: | |
1227 | operations.append({"op": "assert", | |
1228 | "lock": self.idl.lock_name}) | |
1229 | ||
1230 | # Add prerequisites and declarations of new rows. | |
cb96c1b2 | 1231 | for row in six.itervalues(self._txn_rows): |
8cdf0349 BP |
1232 | if row._prereqs: |
1233 | rows = {} | |
1234 | columns = [] | |
1235 | for column_name in row._prereqs: | |
1236 | columns.append(column_name) | |
1237 | rows[column_name] = row._data[column_name].to_json() | |
1238 | operations.append({"op": "wait", | |
1239 | "table": row._table.name, | |
1240 | "timeout": 0, | |
1241 | "where": _where_uuid_equals(row.uuid), | |
1242 | "until": "==", | |
1243 | "columns": columns, | |
1244 | "rows": [rows]}) | |
1245 | ||
1246 | # Add updates. | |
1247 | any_updates = False | |
cb96c1b2 | 1248 | for row in six.itervalues(self._txn_rows): |
8cdf0349 BP |
1249 | if row._changes is None: |
1250 | if row._table.is_root: | |
1251 | operations.append({"op": "delete", | |
1252 | "table": row._table.name, | |
1253 | "where": _where_uuid_equals(row.uuid)}) | |
1254 | any_updates = True | |
1255 | else: | |
1256 | # Let ovsdb-server decide whether to really delete it. | |
1257 | pass | |
1258 | elif row._changes: | |
1259 | op = {"table": row._table.name} | |
1260 | if row._data is None: | |
1261 | op["op"] = "insert" | |
1262 | op["uuid-name"] = _uuid_name_from_uuid(row.uuid) | |
1263 | any_updates = True | |
1264 | ||
1265 | op_index = len(operations) - 1 | |
1266 | self._inserted_rows[row.uuid] = _InsertedRow(op_index) | |
1267 | else: | |
1268 | op["op"] = "update" | |
1269 | op["where"] = _where_uuid_equals(row.uuid) | |
1270 | ||
1271 | row_json = {} | |
1272 | op["row"] = row_json | |
1273 | ||
cb96c1b2 | 1274 | for column_name, datum in six.iteritems(row._changes): |
8cdf0349 | 1275 | if row._data is not None or not datum.is_default(): |
26bb0f31 | 1276 | row_json[column_name] = ( |
897c8064 | 1277 | self._substitute_uuids(datum.to_json())) |
8cdf0349 BP |
1278 | |
1279 | # If anything really changed, consider it an update. | |
1280 | # We can't suppress not-really-changed values earlier | |
1281 | # or transactions would become nonatomic (see the big | |
1282 | # comment inside Transaction._write()). | |
1283 | if (not any_updates and row._data is not None and | |
897c8064 | 1284 | row._data[column_name] != datum): |
8cdf0349 BP |
1285 | any_updates = True |
1286 | ||
1287 | if row._data is None or row_json: | |
1288 | operations.append(op) | |
a59912a0 RM |
1289 | if row._mutations: |
1290 | addop = False | |
1291 | op = {"table": row._table.name} | |
1292 | op["op"] = "mutate" | |
b3220c67 AB |
1293 | if row._data is None: |
1294 | # New row | |
1295 | op["where"] = self._substitute_uuids( | |
1296 | _where_uuid_equals(row.uuid)) | |
1297 | else: | |
1298 | # Existing row | |
1299 | op["where"] = _where_uuid_equals(row.uuid) | |
a59912a0 RM |
1300 | op["mutations"] = [] |
1301 | if '_removes' in row._mutations.keys(): | |
1302 | for col, dat in six.iteritems(row._mutations['_removes']): | |
1303 | column = row._table.columns[col] | |
1304 | if column.type.is_map(): | |
1305 | opdat = ["set"] | |
330b9c9c | 1306 | opdat.append(list(dat)) |
a59912a0 RM |
1307 | else: |
1308 | opdat = ["set"] | |
1309 | inner_opdat = [] | |
1310 | for ele in dat: | |
1311 | try: | |
1312 | datum = data.Datum.from_python(column.type, | |
1313 | ele, _row_to_uuid) | |
1314 | except error.Error: | |
1315 | return | |
1316 | inner_opdat.append( | |
1317 | self._substitute_uuids(datum.to_json())) | |
1318 | opdat.append(inner_opdat) | |
1319 | mutation = [col, "delete", opdat] | |
1320 | op["mutations"].append(mutation) | |
1321 | addop = True | |
1322 | if '_inserts' in row._mutations.keys(): | |
330b9c9c | 1323 | for col, val in six.iteritems(row._mutations['_inserts']): |
a59912a0 RM |
1324 | column = row._table.columns[col] |
1325 | if column.type.is_map(): | |
1326 | opdat = ["map"] | |
330b9c9c AB |
1327 | datum = data.Datum.from_python(column.type, val, |
1328 | _row_to_uuid) | |
1329 | opdat.append(datum.as_list()) | |
a59912a0 RM |
1330 | else: |
1331 | opdat = ["set"] | |
1332 | inner_opdat = [] | |
330b9c9c | 1333 | for ele in val: |
a59912a0 RM |
1334 | try: |
1335 | datum = data.Datum.from_python(column.type, | |
1336 | ele, _row_to_uuid) | |
1337 | except error.Error: | |
1338 | return | |
1339 | inner_opdat.append( | |
1340 | self._substitute_uuids(datum.to_json())) | |
1341 | opdat.append(inner_opdat) | |
1342 | mutation = [col, "insert", opdat] | |
1343 | op["mutations"].append(mutation) | |
1344 | addop = True | |
1345 | if addop: | |
1346 | operations.append(op) | |
1347 | any_updates = True | |
8cdf0349 | 1348 | |
80c12152 SA |
1349 | if self._fetch_requests: |
1350 | for fetch in self._fetch_requests: | |
1351 | fetch["index"] = len(operations) - 1 | |
1352 | operations.append({"op": "select", | |
1353 | "table": fetch["row"]._table.name, | |
1354 | "where": self._substitute_uuids( | |
1355 | _where_uuid_equals(fetch["row"].uuid)), | |
1356 | "columns": [fetch["column_name"]]}) | |
1357 | any_updates = True | |
1358 | ||
8cdf0349 | 1359 | # Add increment. |
94fbe1aa | 1360 | if self._inc_row and any_updates: |
8cdf0349 BP |
1361 | self._inc_index = len(operations) - 1 |
1362 | ||
1363 | operations.append({"op": "mutate", | |
94fbe1aa | 1364 | "table": self._inc_row._table.name, |
26bb0f31 | 1365 | "where": self._substitute_uuids( |
94fbe1aa | 1366 | _where_uuid_equals(self._inc_row.uuid)), |
8cdf0349 BP |
1367 | "mutations": [[self._inc_column, "+=", 1]]}) |
1368 | operations.append({"op": "select", | |
94fbe1aa | 1369 | "table": self._inc_row._table.name, |
26bb0f31 | 1370 | "where": self._substitute_uuids( |
94fbe1aa | 1371 | _where_uuid_equals(self._inc_row.uuid)), |
8cdf0349 BP |
1372 | "columns": [self._inc_column]}) |
1373 | ||
1374 | # Add comment. | |
1375 | if self._comments: | |
1376 | operations.append({"op": "comment", | |
1377 | "comment": "\n".join(self._comments)}) | |
1378 | ||
1379 | # Dry run? | |
1380 | if self.dry_run: | |
1381 | operations.append({"op": "abort"}) | |
1382 | ||
1383 | if not any_updates: | |
1384 | self._status = Transaction.UNCHANGED | |
1385 | else: | |
1386 | msg = ovs.jsonrpc.Message.create_request("transact", operations) | |
1387 | self._request_id = msg.id | |
1388 | if not self.idl._session.send(msg): | |
1389 | self.idl._outstanding_txns[self._request_id] = self | |
1390 | self._status = Transaction.INCOMPLETE | |
1391 | else: | |
854a94d9 | 1392 | self._status = Transaction.TRY_AGAIN |
8cdf0349 BP |
1393 | |
1394 | self.__disassemble() | |
1395 | return self._status | |
1396 | ||
1397 | def commit_block(self): | |
2f926787 BP |
1398 | """Attempts to commit this transaction, blocking until the commit |
1399 | either succeeds or fails. Returns the final commit status, which may | |
1400 | be any Transaction.* value other than Transaction.INCOMPLETE. | |
1401 | ||
1402 | This function calls Idl.run() on this transaction'ss IDL, so it may | |
1403 | cause Idl.change_seqno to change.""" | |
8cdf0349 BP |
1404 | while True: |
1405 | status = self.commit() | |
1406 | if status != Transaction.INCOMPLETE: | |
1407 | return status | |
1408 | ||
1409 | self.idl.run() | |
1410 | ||
1411 | poller = ovs.poller.Poller() | |
1412 | self.idl.wait(poller) | |
1413 | self.wait(poller) | |
1414 | poller.block() | |
1415 | ||
1416 | def get_increment_new_value(self): | |
2f926787 BP |
1417 | """Returns the final (incremented) value of the column in this |
1418 | transaction that was set to be incremented by Row.increment. This | |
1419 | transaction must have committed successfully.""" | |
8cdf0349 BP |
1420 | assert self._status == Transaction.SUCCESS |
1421 | return self._inc_new_value | |
1422 | ||
1423 | def abort(self): | |
1424 | """Aborts this transaction. If Transaction.commit() has already been | |
1425 | called then the transaction might get committed anyhow.""" | |
1426 | self.__disassemble() | |
1427 | if self._status in (Transaction.UNCOMMITTED, | |
1428 | Transaction.INCOMPLETE): | |
1429 | self._status = Transaction.ABORTED | |
1430 | ||
1431 | def get_error(self): | |
1432 | """Returns a string representing this transaction's current status, | |
1433 | suitable for use in log messages.""" | |
1434 | if self._status != Transaction.ERROR: | |
1435 | return Transaction.status_to_string(self._status) | |
1436 | elif self._error: | |
1437 | return self._error | |
1438 | else: | |
1439 | return "no error details available" | |
1440 | ||
1441 | def __set_error_json(self, json): | |
1442 | if self._error is None: | |
1443 | self._error = ovs.json.to_string(json) | |
1444 | ||
1445 | def get_insert_uuid(self, uuid): | |
1446 | """Finds and returns the permanent UUID that the database assigned to a | |
1447 | newly inserted row, given the UUID that Transaction.insert() assigned | |
1448 | locally to that row. | |
1449 | ||
1450 | Returns None if 'uuid' is not a UUID assigned by Transaction.insert() | |
1451 | or if it was assigned by that function and then deleted by Row.delete() | |
1452 | within the same transaction. (Rows that are inserted and then deleted | |
1453 | within a single transaction are never sent to the database server, so | |
1454 | it never assigns them a permanent UUID.) | |
1455 | ||
1456 | This transaction must have completed successfully.""" | |
1457 | assert self._status in (Transaction.SUCCESS, | |
1458 | Transaction.UNCHANGED) | |
1459 | inserted_row = self._inserted_rows.get(uuid) | |
1460 | if inserted_row: | |
1461 | return inserted_row.real | |
1462 | return None | |
1463 | ||
94fbe1aa BP |
1464 | def _increment(self, row, column): |
1465 | assert not self._inc_row | |
1466 | self._inc_row = row | |
1467 | self._inc_column = column | |
1468 | ||
80c12152 | 1469 | def _fetch(self, row, column_name): |
a0631d92 | 1470 | self._fetch_requests.append({"row": row, "column_name": column_name}) |
80c12152 | 1471 | |
8cdf0349 BP |
1472 | def _write(self, row, column, datum): |
1473 | assert row._changes is not None | |
a59912a0 | 1474 | assert row._mutations is not None |
8cdf0349 BP |
1475 | |
1476 | txn = row._idl.txn | |
1477 | ||
1478 | # If this is a write-only column and the datum being written is the | |
1479 | # same as the one already there, just skip the update entirely. This | |
1480 | # is worth optimizing because we have a lot of columns that get | |
1481 | # periodically refreshed into the database but don't actually change | |
1482 | # that often. | |
1483 | # | |
1484 | # We don't do this for read/write columns because that would break | |
1485 | # atomicity of transactions--some other client might have written a | |
1486 | # different value in that column since we read it. (But if a whole | |
1487 | # transaction only does writes of existing values, without making any | |
1488 | # real changes, we will drop the whole transaction later in | |
1489 | # ovsdb_idl_txn_commit().) | |
37520ab3 RB |
1490 | if (not column.alert and row._data and |
1491 | row._data.get(column.name) == datum): | |
8cdf0349 BP |
1492 | new_value = row._changes.get(column.name) |
1493 | if new_value is None or new_value == datum: | |
1494 | return | |
1495 | ||
1496 | txn._txn_rows[row.uuid] = row | |
330b9c9c AB |
1497 | if '_inserts' in row._mutations: |
1498 | row._mutations['_inserts'].pop(column.name, None) | |
1499 | if '_removes' in row._mutations: | |
1500 | row._mutations['_removes'].pop(column.name, None) | |
1501 | row._changes[column.name] = datum.copy() | |
8cdf0349 BP |
1502 | |
1503 | def insert(self, table, new_uuid=None): | |
1504 | """Inserts and returns a new row in 'table', which must be one of the | |
1505 | ovs.db.schema.TableSchema objects in the Idl's 'tables' dict. | |
1506 | ||
1507 | The new row is assigned a provisional UUID. If 'uuid' is None then one | |
1508 | is randomly generated; otherwise 'uuid' should specify a randomly | |
1509 | generated uuid.UUID not otherwise in use. ovsdb-server will assign a | |
1510 | different UUID when 'txn' is committed, but the IDL will replace any | |
1511 | uses of the provisional UUID in the data to be to be committed by the | |
1512 | UUID assigned by ovsdb-server.""" | |
1513 | assert self._status == Transaction.UNCOMMITTED | |
1514 | if new_uuid is None: | |
1515 | new_uuid = uuid.uuid4() | |
1516 | row = Row(self.idl, table, new_uuid, None) | |
1517 | table.rows[row.uuid] = row | |
1518 | self._txn_rows[row.uuid] = row | |
1519 | return row | |
1520 | ||
1521 | def _process_reply(self, msg): | |
1522 | if msg.type == ovs.jsonrpc.Message.T_ERROR: | |
1523 | self._status = Transaction.ERROR | |
da2d45c6 | 1524 | elif not isinstance(msg.result, (list, tuple)): |
8cdf0349 | 1525 | # XXX rate-limit |
3a656eaf | 1526 | vlog.warn('reply to "transact" is not JSON array') |
8cdf0349 BP |
1527 | else: |
1528 | hard_errors = False | |
1529 | soft_errors = False | |
1530 | lock_errors = False | |
1531 | ||
1532 | ops = msg.result | |
1533 | for op in ops: | |
1534 | if op is None: | |
1535 | # This isn't an error in itself but indicates that some | |
1536 | # prior operation failed, so make sure that we know about | |
1537 | # it. | |
1538 | soft_errors = True | |
da2d45c6 | 1539 | elif isinstance(op, dict): |
8cdf0349 BP |
1540 | error = op.get("error") |
1541 | if error is not None: | |
1542 | if error == "timed out": | |
1543 | soft_errors = True | |
1544 | elif error == "not owner": | |
1545 | lock_errors = True | |
1546 | elif error == "aborted": | |
1547 | pass | |
1548 | else: | |
1549 | hard_errors = True | |
1550 | self.__set_error_json(op) | |
1551 | else: | |
1552 | hard_errors = True | |
1553 | self.__set_error_json(op) | |
1554 | # XXX rate-limit | |
3a656eaf | 1555 | vlog.warn("operation reply is not JSON null or object") |
8cdf0349 BP |
1556 | |
1557 | if not soft_errors and not hard_errors and not lock_errors: | |
94fbe1aa | 1558 | if self._inc_row and not self.__process_inc_reply(ops): |
8cdf0349 | 1559 | hard_errors = True |
80c12152 SA |
1560 | if self._fetch_requests: |
1561 | if self.__process_fetch_reply(ops): | |
1562 | self.idl.change_seqno += 1 | |
1563 | else: | |
1564 | hard_errors = True | |
8cdf0349 | 1565 | |
cb96c1b2 | 1566 | for insert in six.itervalues(self._inserted_rows): |
8cdf0349 BP |
1567 | if not self.__process_insert_reply(insert, ops): |
1568 | hard_errors = True | |
1569 | ||
1570 | if hard_errors: | |
1571 | self._status = Transaction.ERROR | |
1572 | elif lock_errors: | |
1573 | self._status = Transaction.NOT_LOCKED | |
1574 | elif soft_errors: | |
854a94d9 | 1575 | self._status = Transaction.TRY_AGAIN |
8cdf0349 BP |
1576 | else: |
1577 | self._status = Transaction.SUCCESS | |
1578 | ||
1579 | @staticmethod | |
1580 | def __check_json_type(json, types, name): | |
1581 | if not json: | |
1582 | # XXX rate-limit | |
3a656eaf | 1583 | vlog.warn("%s is missing" % name) |
8cdf0349 | 1584 | return False |
da2d45c6 | 1585 | elif not isinstance(json, tuple(types)): |
8cdf0349 | 1586 | # XXX rate-limit |
3a656eaf | 1587 | vlog.warn("%s has unexpected type %s" % (name, type(json))) |
8cdf0349 BP |
1588 | return False |
1589 | else: | |
1590 | return True | |
1591 | ||
80c12152 SA |
1592 | def __process_fetch_reply(self, ops): |
1593 | update = False | |
1594 | for fetch_request in self._fetch_requests: | |
1595 | row = fetch_request["row"] | |
1596 | column_name = fetch_request["column_name"] | |
1597 | index = fetch_request["index"] | |
1598 | table = row._table | |
1599 | ||
1600 | select = ops[index] | |
1601 | fetched_rows = select.get("rows") | |
1602 | if not Transaction.__check_json_type(fetched_rows, (list, tuple), | |
1603 | '"select" reply "rows"'): | |
1604 | return False | |
1605 | if len(fetched_rows) != 1: | |
1606 | # XXX rate-limit | |
1607 | vlog.warn('"select" reply "rows" has %d elements ' | |
e8049bc5 | 1608 | 'instead of 1' % len(fetched_rows)) |
80c12152 SA |
1609 | continue |
1610 | fetched_row = fetched_rows[0] | |
1611 | if not Transaction.__check_json_type(fetched_row, (dict,), | |
1612 | '"select" reply row'): | |
1613 | continue | |
1614 | ||
1615 | column = table.columns.get(column_name) | |
1616 | datum_json = fetched_row.get(column_name) | |
a59912a0 | 1617 | datum = data.Datum.from_json(column.type, datum_json) |
80c12152 SA |
1618 | |
1619 | row._data[column_name] = datum | |
1620 | update = True | |
1621 | ||
1622 | return update | |
1623 | ||
8cdf0349 BP |
1624 | def __process_inc_reply(self, ops): |
1625 | if self._inc_index + 2 > len(ops): | |
1626 | # XXX rate-limit | |
3a656eaf EJ |
1627 | vlog.warn("reply does not contain enough operations for " |
1628 | "increment (has %d, needs %d)" % | |
1629 | (len(ops), self._inc_index + 2)) | |
8cdf0349 BP |
1630 | |
1631 | # We know that this is a JSON object because the loop in | |
1632 | # __process_reply() already checked. | |
1633 | mutate = ops[self._inc_index] | |
1634 | count = mutate.get("count") | |
8f808842 | 1635 | if not Transaction.__check_json_type(count, six.integer_types, |
8cdf0349 BP |
1636 | '"mutate" reply "count"'): |
1637 | return False | |
1638 | if count != 1: | |
1639 | # XXX rate-limit | |
3a656eaf | 1640 | vlog.warn('"mutate" reply "count" is %d instead of 1' % count) |
8cdf0349 BP |
1641 | return False |
1642 | ||
1643 | select = ops[self._inc_index + 1] | |
1644 | rows = select.get("rows") | |
1645 | if not Transaction.__check_json_type(rows, (list, tuple), | |
1646 | '"select" reply "rows"'): | |
1647 | return False | |
1648 | if len(rows) != 1: | |
1649 | # XXX rate-limit | |
3a656eaf EJ |
1650 | vlog.warn('"select" reply "rows" has %d elements ' |
1651 | 'instead of 1' % len(rows)) | |
8cdf0349 BP |
1652 | return False |
1653 | row = rows[0] | |
1654 | if not Transaction.__check_json_type(row, (dict,), | |
1655 | '"select" reply row'): | |
1656 | return False | |
1657 | column = row.get(self._inc_column) | |
8f808842 | 1658 | if not Transaction.__check_json_type(column, six.integer_types, |
8cdf0349 BP |
1659 | '"select" reply inc column'): |
1660 | return False | |
1661 | self._inc_new_value = column | |
1662 | return True | |
1663 | ||
1664 | def __process_insert_reply(self, insert, ops): | |
1665 | if insert.op_index >= len(ops): | |
1666 | # XXX rate-limit | |
3a656eaf EJ |
1667 | vlog.warn("reply does not contain enough operations " |
1668 | "for insert (has %d, needs %d)" | |
1669 | % (len(ops), insert.op_index)) | |
8cdf0349 BP |
1670 | return False |
1671 | ||
1672 | # We know that this is a JSON object because the loop in | |
1673 | # __process_reply() already checked. | |
1674 | reply = ops[insert.op_index] | |
1675 | json_uuid = reply.get("uuid") | |
1676 | if not Transaction.__check_json_type(json_uuid, (tuple, list), | |
1677 | '"insert" reply "uuid"'): | |
1678 | return False | |
1679 | ||
1680 | try: | |
1681 | uuid_ = ovs.ovsuuid.from_json(json_uuid) | |
1682 | except error.Error: | |
1683 | # XXX rate-limit | |
3a656eaf | 1684 | vlog.warn('"insert" reply "uuid" is not a JSON UUID') |
8cdf0349 | 1685 | return False |
bf6ec045 | 1686 | |
8cdf0349 BP |
1687 | insert.real = uuid_ |
1688 | return True | |
ad0991e6 EJ |
1689 | |
1690 | ||
1691 | class SchemaHelper(object): | |
1692 | """IDL Schema helper. | |
1693 | ||
1694 | This class encapsulates the logic required to generate schemas suitable | |
1695 | for creating 'ovs.db.idl.Idl' objects. Clients should register columns | |
1696 | they are interested in using register_columns(). When finished, the | |
1697 | get_idl_schema() function may be called. | |
1698 | ||
1699 | The location on disk of the schema used may be found in the | |
1700 | 'schema_location' variable.""" | |
1701 | ||
e15ad8e6 IY |
1702 | def __init__(self, location=None, schema_json=None): |
1703 | """Creates a new Schema object. | |
ad0991e6 | 1704 | |
e15ad8e6 IY |
1705 | 'location' file path to ovs schema. None means default location |
1706 | 'schema_json' schema in json preresentation in memory | |
1707 | """ | |
1708 | ||
1709 | if location and schema_json: | |
1710 | raise ValueError("both location and schema_json can't be " | |
1711 | "specified. it's ambiguous.") | |
1712 | if schema_json is None: | |
1713 | if location is None: | |
1714 | location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR | |
1715 | schema_json = ovs.json.from_file(location) | |
ad0991e6 | 1716 | |
e15ad8e6 | 1717 | self.schema_json = schema_json |
ad0991e6 | 1718 | self._tables = {} |
80c12152 | 1719 | self._readonly = {} |
bf42f674 | 1720 | self._all = False |
ad0991e6 | 1721 | |
80c12152 | 1722 | def register_columns(self, table, columns, readonly=[]): |
ad0991e6 EJ |
1723 | """Registers interest in the given 'columns' of 'table'. Future calls |
1724 | to get_idl_schema() will include 'table':column for each column in | |
1725 | 'columns'. This function automatically avoids adding duplicate entries | |
1726 | to the schema. | |
80c12152 SA |
1727 | A subset of 'columns' can be specified as 'readonly'. The readonly |
1728 | columns are not replicated but can be fetched on-demand by the user | |
1729 | with Row.fetch(). | |
ad0991e6 EJ |
1730 | |
1731 | 'table' must be a string. | |
1732 | 'columns' must be a list of strings. | |
80c12152 | 1733 | 'readonly' must be a list of strings. |
ad0991e6 EJ |
1734 | """ |
1735 | ||
da2d45c6 RB |
1736 | assert isinstance(table, six.string_types) |
1737 | assert isinstance(columns, list) | |
ad0991e6 EJ |
1738 | |
1739 | columns = set(columns) | self._tables.get(table, set()) | |
1740 | self._tables[table] = columns | |
80c12152 | 1741 | self._readonly[table] = readonly |
ad0991e6 | 1742 | |
7698e31d IY |
1743 | def register_table(self, table): |
1744 | """Registers interest in the given all columns of 'table'. Future calls | |
1745 | to get_idl_schema() will include all columns of 'table'. | |
1746 | ||
1747 | 'table' must be a string | |
1748 | """ | |
da2d45c6 | 1749 | assert isinstance(table, six.string_types) |
7698e31d IY |
1750 | self._tables[table] = set() # empty set means all columns in the table |
1751 | ||
bf42f674 EJ |
1752 | def register_all(self): |
1753 | """Registers interest in every column of every table.""" | |
1754 | self._all = True | |
1755 | ||
ad0991e6 EJ |
1756 | def get_idl_schema(self): |
1757 | """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL' | |
1758 | object based on columns registered using the register_columns() | |
1759 | function.""" | |
1760 | ||
e15ad8e6 IY |
1761 | schema = ovs.db.schema.DbSchema.from_json(self.schema_json) |
1762 | self.schema_json = None | |
ad0991e6 | 1763 | |
bf42f674 EJ |
1764 | if not self._all: |
1765 | schema_tables = {} | |
cb96c1b2 | 1766 | for table, columns in six.iteritems(self._tables): |
bf42f674 EJ |
1767 | schema_tables[table] = ( |
1768 | self._keep_table_columns(schema, table, columns)) | |
1769 | ||
1770 | schema.tables = schema_tables | |
80c12152 | 1771 | schema.readonly = self._readonly |
ad0991e6 EJ |
1772 | return schema |
1773 | ||
1774 | def _keep_table_columns(self, schema, table_name, columns): | |
1775 | assert table_name in schema.tables | |
1776 | table = schema.tables[table_name] | |
1777 | ||
7698e31d IY |
1778 | if not columns: |
1779 | # empty set means all columns in the table | |
1780 | return table | |
1781 | ||
ad0991e6 EJ |
1782 | new_columns = {} |
1783 | for column_name in columns: | |
da2d45c6 | 1784 | assert isinstance(column_name, six.string_types) |
ad0991e6 EJ |
1785 | assert column_name in table.columns |
1786 | ||
1787 | new_columns[column_name] = table.columns[column_name] | |
1788 | ||
1789 | table.columns = new_columns | |
1790 | return table |