]> git.proxmox.com Git - mirror_ovs.git/blob - ovsdb/trigger.c
raft: Send all missing logs in one single append_request.
[mirror_ovs.git] / ovsdb / trigger.c
1 /* Copyright (c) 2009, 2010, 2011, 2012 Nicira, Inc.
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <config.h>
17
18 #include "trigger.h"
19
20 #include <limits.h>
21 #include <string.h>
22
23 #include "file.h"
24 #include "openvswitch/json.h"
25 #include "jsonrpc.h"
26 #include "ovsdb.h"
27 #include "ovsdb-error.h"
28 #include "openvswitch/poll-loop.h"
29 #include "server.h"
30 #include "transaction.h"
31 #include "openvswitch/vlog.h"
32 #include "util.h"
33
34 VLOG_DEFINE_THIS_MODULE(trigger);
35
36 static bool ovsdb_trigger_try(struct ovsdb_trigger *, long long int now);
37 static void ovsdb_trigger_complete(struct ovsdb_trigger *);
38 static void trigger_convert_error(struct ovsdb_trigger *,
39 struct ovsdb_error *);
40 static void trigger_success(struct ovsdb_trigger *, struct json *result);
41
42 bool
43 ovsdb_trigger_init(struct ovsdb_session *session, struct ovsdb *db,
44 struct ovsdb_trigger *trigger,
45 struct jsonrpc_msg *request, long long int now,
46 bool read_only, const char *role, const char *id)
47 {
48 ovs_assert(!strcmp(request->method, "transact") ||
49 !strcmp(request->method, "convert"));
50 trigger->session = session;
51 trigger->db = db;
52 ovs_list_push_back(&trigger->db->triggers, &trigger->node);
53 trigger->request = request;
54 trigger->reply = NULL;
55 trigger->progress = NULL;
56 trigger->created = now;
57 trigger->timeout_msec = LLONG_MAX;
58 trigger->read_only = read_only;
59 trigger->role = nullable_xstrdup(role);
60 trigger->id = nullable_xstrdup(id);
61 return ovsdb_trigger_try(trigger, now);
62 }
63
64 void
65 ovsdb_trigger_destroy(struct ovsdb_trigger *trigger)
66 {
67 ovsdb_txn_progress_destroy(trigger->progress);
68 ovs_list_remove(&trigger->node);
69 jsonrpc_msg_destroy(trigger->request);
70 jsonrpc_msg_destroy(trigger->reply);
71 free(trigger->role);
72 free(trigger->id);
73 }
74
75 bool
76 ovsdb_trigger_is_complete(const struct ovsdb_trigger *trigger)
77 {
78 return trigger->reply && !trigger->progress;
79 }
80
81 struct jsonrpc_msg *
82 ovsdb_trigger_steal_reply(struct ovsdb_trigger *trigger)
83 {
84 struct jsonrpc_msg *reply = trigger->reply;
85 trigger->reply = NULL;
86 return reply;
87 }
88
89 /* Cancels 'trigger'. 'reason' should be a human-readable reason for log
90 * messages etc. */
91 void
92 ovsdb_trigger_cancel(struct ovsdb_trigger *trigger, const char *reason)
93 {
94 if (trigger->progress) {
95 /* The transaction still might complete asynchronously, but we can stop
96 * tracking it. */
97 ovsdb_txn_progress_destroy(trigger->progress);
98 trigger->progress = NULL;
99 }
100
101 jsonrpc_msg_destroy(trigger->reply);
102 trigger->reply = NULL;
103
104 if (!strcmp(trigger->request->method, "transact")) {
105 /* There's no place to stick 'reason' into the error reply because RFC
106 * 7047 prescribes a fix form for these messages, see section 4.1.4. */
107 trigger->reply = jsonrpc_create_error(json_string_create("canceled"),
108 trigger->request->id);
109 ovsdb_trigger_complete(trigger);
110 } else if (!strcmp(trigger->request->method, "convert")) {
111 trigger_convert_error(
112 trigger,
113 ovsdb_error("canceled", "database conversion canceled because %s",
114 reason));
115 }
116 }
117
118 void
119 ovsdb_trigger_prereplace_db(struct ovsdb_trigger *trigger)
120 {
121 if (!ovsdb_trigger_is_complete(trigger)) {
122 if (!strcmp(trigger->request->method, "transact")) {
123 ovsdb_trigger_cancel(trigger, "database schema is changing");
124 } else if (!strcmp(trigger->request->method, "convert")) {
125 /* We don't cancel "convert" requests when a database is being
126 * replaced for two reasons. First, we expect the administrator to
127 * do some kind of sensible synchronization on conversion requests,
128 * that is, it only really makes sense for the admin to do a single
129 * conversion at a time at a scheduled point. Second, if we did
130 * then every "convert" request would end up getting canceled since
131 * "convert" itself causes the database to be replaced. */
132 } else {
133 OVS_NOT_REACHED();
134 }
135 }
136 }
137
138 bool
139 ovsdb_trigger_run(struct ovsdb *db, long long int now)
140 {
141 struct ovsdb_trigger *t, *next;
142
143 bool run_triggers = db->run_triggers;
144 db->run_triggers_now = db->run_triggers = false;
145
146 bool disconnect_all = false;
147
148 LIST_FOR_EACH_SAFE (t, next, node, &db->triggers) {
149 if (run_triggers
150 || now - t->created >= t->timeout_msec
151 || t->progress) {
152 if (ovsdb_trigger_try(t, now)) {
153 disconnect_all = true;
154 }
155 }
156 }
157 return disconnect_all;
158 }
159
160 void
161 ovsdb_trigger_wait(struct ovsdb *db, long long int now)
162 {
163 if (db->run_triggers_now) {
164 poll_immediate_wake();
165 } else {
166 long long int deadline = LLONG_MAX;
167 struct ovsdb_trigger *t;
168
169 LIST_FOR_EACH (t, node, &db->triggers) {
170 if (t->created < LLONG_MAX - t->timeout_msec) {
171 long long int t_deadline = t->created + t->timeout_msec;
172 if (deadline > t_deadline) {
173 deadline = t_deadline;
174 if (now >= deadline) {
175 break;
176 }
177 }
178 }
179 }
180
181 if (deadline < LLONG_MAX) {
182 poll_timer_wait_until(deadline);
183 }
184 }
185 }
186
187 static bool
188 ovsdb_trigger_try(struct ovsdb_trigger *t, long long int now)
189 {
190 /* Handle "initialized" state. */
191 if (!t->reply) {
192 ovs_assert(!t->progress);
193
194 struct ovsdb_txn *txn = NULL;
195 struct ovsdb *newdb = NULL;
196 if (!strcmp(t->request->method, "transact")) {
197 if (!ovsdb_txn_precheck_prereq(t->db)) {
198 return false;
199 }
200
201 bool durable;
202
203 struct json *result;
204 txn = ovsdb_execute_compose(
205 t->db, t->session, t->request->params, t->read_only,
206 t->role, t->id, now - t->created, &t->timeout_msec,
207 &durable, &result);
208 if (!txn) {
209 if (result) {
210 /* Complete. There was an error but we still represent it
211 * in JSON-RPC as a successful result. */
212 trigger_success(t, result);
213 } else {
214 /* Unsatisfied "wait" condition. Take no action now, retry
215 * later. */
216 }
217 return false;
218 }
219
220 /* Transition to "committing" state. */
221 t->reply = jsonrpc_create_reply(result, t->request->id);
222 t->progress = ovsdb_txn_propose_commit(txn, durable);
223 } else if (!strcmp(t->request->method, "convert")) {
224 /* Permission check. */
225 if (t->role && *t->role) {
226 trigger_convert_error(
227 t, ovsdb_perm_error(
228 "RBAC rules for client \"%s\" role \"%s\" prohibit "
229 "\"convert\" of database %s "
230 "(only the root role may convert databases)",
231 t->id, t->role, t->db->schema->name));
232 return false;
233 }
234
235 /* Validate parameters. */
236 const struct json *params = t->request->params;
237 if (params->type != JSON_ARRAY || params->array.n != 2) {
238 trigger_convert_error(t, ovsdb_syntax_error(params, NULL,
239 "array expected"));
240 return false;
241 }
242
243 /* Parse new schema and make a converted copy. */
244 const struct json *new_schema_json = params->array.elems[1];
245 struct ovsdb_schema *new_schema;
246 struct ovsdb_error *error
247 = ovsdb_schema_from_json(new_schema_json, &new_schema);
248 if (!error && strcmp(new_schema->name, t->db->schema->name)) {
249 error = ovsdb_error("invalid parameters",
250 "new schema name (%s) does not match "
251 "database name (%s)",
252 new_schema->name, t->db->schema->name);
253 }
254 if (!error) {
255 error = ovsdb_convert(t->db, new_schema, &newdb);
256 }
257 ovsdb_schema_destroy(new_schema);
258 if (error) {
259 trigger_convert_error(t, error);
260 return false;
261 }
262
263 /* Make the new copy into a transaction log record. */
264 struct json *txn_json = ovsdb_to_txn_json(
265 newdb, "converted by ovsdb-server");
266
267 /* Propose the change. */
268 t->progress = ovsdb_txn_propose_schema_change(
269 t->db, new_schema_json, txn_json);
270 json_destroy(txn_json);
271 t->reply = jsonrpc_create_reply(json_object_create(),
272 t->request->id);
273 } else {
274 OVS_NOT_REACHED();
275 }
276
277 /* If the transaction committed synchronously, complete it and
278 * transition to "complete". This is more than an optimization because
279 * the file-based storage isn't implemented to read back the
280 * transactions that we write (which is an ugly broken abstraction but
281 * it's what we have). */
282 if (ovsdb_txn_progress_is_complete(t->progress)
283 && !ovsdb_txn_progress_get_error(t->progress)) {
284 if (txn) {
285 ovsdb_txn_complete(txn);
286 }
287 ovsdb_txn_progress_destroy(t->progress);
288 t->progress = NULL;
289 ovsdb_trigger_complete(t);
290 if (newdb) {
291 ovsdb_replace(t->db, newdb);
292 return true;
293 }
294 return false;
295 }
296 ovsdb_destroy(newdb);
297
298 /* Fall through to the general handling for the "committing" state. We
299 * abort the transaction--if and when it eventually commits, we'll read
300 * it back from storage and replay it locally. */
301 if (txn) {
302 ovsdb_txn_abort(txn);
303 }
304 }
305
306 /* Handle "committing" state. */
307 if (t->progress) {
308 if (!ovsdb_txn_progress_is_complete(t->progress)) {
309 return false;
310 }
311
312 /* Transition to "complete". */
313 struct ovsdb_error *error
314 = ovsdb_error_clone(ovsdb_txn_progress_get_error(t->progress));
315 ovsdb_txn_progress_destroy(t->progress);
316 t->progress = NULL;
317
318 if (error) {
319 if (!strcmp(ovsdb_error_get_tag(error), "cluster error")) {
320 /* Temporary error. Transition back to "initialized" state to
321 * try again. */
322 char *err_s = ovsdb_error_to_string(error);
323 VLOG_DBG("cluster error %s", err_s);
324
325 jsonrpc_msg_destroy(t->reply);
326 t->reply = NULL;
327 t->db->run_triggers = true;
328 if (!strstr(err_s, "not leader")) {
329 t->db->run_triggers_now = true;
330 }
331 free(err_s);
332 ovsdb_error_destroy(error);
333 } else {
334 /* Permanent error. Transition to "completed" state to report
335 * it. */
336 if (!strcmp(t->request->method, "transact")) {
337 json_array_add(t->reply->result,
338 ovsdb_error_to_json_free(error));
339 ovsdb_trigger_complete(t);
340 } else if (!strcmp(t->request->method, "convert")) {
341 jsonrpc_msg_destroy(t->reply);
342 t->reply = NULL;
343 trigger_convert_error(t, error);
344 }
345 }
346 } else {
347 /* Success. */
348 ovsdb_trigger_complete(t);
349 }
350
351 return false;
352 }
353
354 OVS_NOT_REACHED();
355 }
356
357 static void
358 ovsdb_trigger_complete(struct ovsdb_trigger *t)
359 {
360 ovs_assert(t->reply);
361 ovs_list_remove(&t->node);
362 ovs_list_push_back(&t->session->completions, &t->node);
363 }
364
365 /* Makes a "convert" request into an error.
366 *
367 * This is not suitable for "transact" requests because their replies should
368 * never be bare ovsdb_errors: RFC 7047 says that their replies must either be
369 * a JSON-RPC reply that contains an array of operation replies (which can be
370 * errors), or a JSON-RPC error whose "error" member is simply "canceled". */
371 static void
372 trigger_convert_error(struct ovsdb_trigger *t, struct ovsdb_error *error)
373 {
374 ovs_assert(!strcmp(t->request->method, "convert"));
375 ovs_assert(error && !t->reply);
376 t->reply = jsonrpc_create_error(
377 ovsdb_error_to_json_free(error), t->request->id);
378 ovsdb_trigger_complete(t);
379 }
380
381 static void
382 trigger_success(struct ovsdb_trigger *t, struct json *result)
383 {
384 ovs_assert(result && !t->reply);
385 t->reply = jsonrpc_create_reply(result, t->request->id);
386 ovsdb_trigger_complete(t);
387 }