]> git.proxmox.com Git - mirror_frr.git/blob - lib/frrcu.c
doc: Add `show ipv6 rpf X:X::X:X` command to docs
[mirror_frr.git] / lib / frrcu.c
1 /*
2 * Copyright (c) 2017-19 David Lamparter, for NetDEF, Inc.
3 *
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
7 *
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15 */
16
17 /* implementation notes: this is an epoch-based RCU implementation. rcu_seq
18 * (global variable) counts the current epoch. Threads hold a specific epoch
19 * in rcu_read_lock(). This is the oldest epoch a thread might be accessing
20 * data from.
21 *
22 * The rcu_seq global is only pushed forward on rcu_read_lock() and
23 * rcu_read_unlock() calls. This makes things a tad more efficient since
24 * those are the only places it matters:
25 * - on rcu_read_lock, we don't want to hold an old epoch pointlessly
26 * - on rcu_read_unlock, we want to make sure we're not stuck on an old epoch
27 * when heading into a long idle period where no thread holds RCU
28 *
29 * rcu_thread structures themselves are RCU-free'd.
30 *
31 * rcu_head structures are the most iffy; normally for an ATOMLIST we would
32 * need to make sure we use rcu_free or pthread_rwlock to deallocate old items
33 * to prevent ABA or use-after-free problems. However, our ATOMLIST code
34 * guarantees that if the list remains non-empty in all cases, we only need
35 * the "last" pointer to do an "add_tail()", i.e. we can't run into ABA/UAF
36 * issues - but we do need to keep at least 1 item on the list.
37 *
38 * (Search the atomlist code for all uses of "last")
39 */
40
41 #ifdef HAVE_CONFIG_H
42 #include "config.h"
43 #endif
44
45 #include <pthread.h>
46 #ifdef HAVE_PTHREAD_NP_H
47 #include <pthread_np.h>
48 #endif
49 #include <string.h>
50 #include <unistd.h>
51 #include <signal.h>
52
53 #include "frrcu.h"
54 #include "seqlock.h"
55 #include "atomlist.h"
56
57 DEFINE_MTYPE_STATIC(LIB, RCU_THREAD, "RCU thread");
58 DEFINE_MTYPE_STATIC(LIB, RCU_NEXT, "RCU sequence barrier");
59
60 DECLARE_ATOMLIST(rcu_heads, struct rcu_head, head);
61
62 PREDECL_ATOMLIST(rcu_threads);
63 struct rcu_thread {
64 struct rcu_threads_item head;
65
66 struct rcu_head rcu_head;
67
68 struct seqlock rcu;
69
70 /* only accessed by thread itself, not atomic */
71 unsigned depth;
72 };
73 DECLARE_ATOMLIST(rcu_threads, struct rcu_thread, head);
74
75 static const struct rcu_action rcua_next = { .type = RCUA_NEXT };
76 static const struct rcu_action rcua_end = { .type = RCUA_END };
77 static const struct rcu_action rcua_close = { .type = RCUA_CLOSE };
78
79 struct rcu_next {
80 struct rcu_head head_free;
81 struct rcu_head head_next;
82 };
83
84 #define rcu_free_internal(mtype, ptr, field) \
85 do { \
86 typeof(ptr) _ptr = (ptr); \
87 struct rcu_head *_rcu_head = &_ptr->field; \
88 static const struct rcu_action _rcu_action = { \
89 .type = RCUA_FREE, \
90 .u.free = { \
91 .mt = mtype, \
92 .offset = offsetof(typeof(*_ptr), field), \
93 }, \
94 }; \
95 _rcu_head->action = &_rcu_action; \
96 rcu_heads_add_tail(&rcu_heads, _rcu_head); \
97 } while (0)
98
99 /* primary global RCU position */
100 static struct seqlock rcu_seq;
101 /* this is set to rcu_seq whenever something is added on the RCU queue.
102 * rcu_read_lock() and rcu_read_unlock() will then bump rcu_seq up one step.
103 */
104 static _Atomic seqlock_val_t rcu_dirty;
105
106 static struct rcu_threads_head rcu_threads;
107 static struct rcu_heads_head rcu_heads;
108
109 /* main thread & RCU sweeper have pre-setup rcu_thread structures. The
110 * reasons are different:
111 *
112 * - rcu_thread_main is there because the main thread isn't started like
113 * other threads, it's implicitly created when the program is started. So
114 * rcu_thread_main matches up implicitly.
115 *
116 * - rcu_thread_rcu isn't actually put on the rcu_threads list (makes no
117 * sense really), it only exists so we can call RCU-using functions from
118 * the RCU thread without special handling in rcu_read_lock/unlock.
119 */
120 static struct rcu_thread rcu_thread_main;
121 static struct rcu_thread rcu_thread_rcu;
122
123 static pthread_t rcu_pthread;
124 static pthread_key_t rcu_thread_key;
125 static bool rcu_active;
126
127 static void rcu_start(void);
128 static void rcu_bump(void);
129
130 /*
131 * preinitialization for main thread
132 */
133 static void rcu_thread_end(void *rcu_thread);
134
135 static void rcu_preinit(void) __attribute__((constructor));
136 static void rcu_preinit(void)
137 {
138 struct rcu_thread *rt;
139
140 rt = &rcu_thread_main;
141 rt->depth = 1;
142 seqlock_init(&rt->rcu);
143 seqlock_acquire_val(&rt->rcu, SEQLOCK_STARTVAL);
144
145 pthread_key_create(&rcu_thread_key, rcu_thread_end);
146 pthread_setspecific(rcu_thread_key, rt);
147
148 rcu_threads_add_tail(&rcu_threads, rt);
149
150 /* RCU sweeper's rcu_thread is a dummy, NOT added to rcu_threads */
151 rt = &rcu_thread_rcu;
152 rt->depth = 1;
153
154 seqlock_init(&rcu_seq);
155 seqlock_acquire_val(&rcu_seq, SEQLOCK_STARTVAL);
156 }
157
158 static struct rcu_thread *rcu_self(void)
159 {
160 return (struct rcu_thread *)pthread_getspecific(rcu_thread_key);
161 }
162
163 /*
164 * thread management (for the non-main thread)
165 */
166 struct rcu_thread *rcu_thread_prepare(void)
167 {
168 struct rcu_thread *rt, *cur;
169
170 rcu_assert_read_locked();
171
172 if (!rcu_active)
173 rcu_start();
174
175 cur = rcu_self();
176 assert(cur->depth);
177
178 /* new thread always starts with rcu_read_lock held at depth 1, and
179 * holding the same epoch as the parent (this makes it possible to
180 * use RCU for things passed into the thread through its arg)
181 */
182 rt = XCALLOC(MTYPE_RCU_THREAD, sizeof(*rt));
183 rt->depth = 1;
184
185 seqlock_init(&rt->rcu);
186 seqlock_acquire(&rt->rcu, &cur->rcu);
187
188 rcu_threads_add_tail(&rcu_threads, rt);
189
190 return rt;
191 }
192
193 void rcu_thread_start(struct rcu_thread *rt)
194 {
195 pthread_setspecific(rcu_thread_key, rt);
196 }
197
198 void rcu_thread_unprepare(struct rcu_thread *rt)
199 {
200 if (rt == &rcu_thread_rcu)
201 return;
202
203 rt->depth = 1;
204 seqlock_acquire(&rt->rcu, &rcu_seq);
205
206 rcu_bump();
207 if (rt != &rcu_thread_main)
208 /* this free() happens after seqlock_release() below */
209 rcu_free_internal(MTYPE_RCU_THREAD, rt, rcu_head);
210
211 rcu_threads_del(&rcu_threads, rt);
212 seqlock_release(&rt->rcu);
213 }
214
215 static void rcu_thread_end(void *rtvoid)
216 {
217 struct rcu_thread *rt = rtvoid;
218 rcu_thread_unprepare(rt);
219 }
220
221 /*
222 * main RCU control aspects
223 */
224
225 static void rcu_bump(void)
226 {
227 struct rcu_next *rn;
228
229 rn = XMALLOC(MTYPE_RCU_NEXT, sizeof(*rn));
230
231 /* note: each RCUA_NEXT item corresponds to exactly one seqno bump.
232 * This means we don't need to communicate which seqno is which
233 * RCUA_NEXT, since we really don't care.
234 */
235
236 /*
237 * Important race condition: while rcu_heads_add_tail is executing,
238 * there is an intermediate point where the rcu_heads "last" pointer
239 * already points to rn->head_next, but rn->head_next isn't added to
240 * the list yet. That means any other "add_tail" calls append to this
241 * item, which isn't fully on the list yet. Freeze this thread at
242 * that point and look at another thread doing a rcu_bump. It adds
243 * these two items and then does a seqlock_bump. But the rcu_heads
244 * list is still "interrupted" and there's no RCUA_NEXT on the list
245 * yet (from either the frozen thread or the second thread). So
246 * rcu_main() might actually hit the end of the list at the
247 * "interrupt".
248 *
249 * This situation is prevented by requiring that rcu_read_lock is held
250 * for any calls to rcu_bump, since if we're holding the current RCU
251 * epoch, that means rcu_main can't be chewing on rcu_heads and hit
252 * that interruption point. Only by the time the thread has continued
253 * to rcu_read_unlock() - and therefore completed the add_tail - the
254 * RCU sweeper gobbles up the epoch and can be sure to find at least
255 * the RCUA_NEXT and RCUA_FREE items on rcu_heads.
256 */
257 rn->head_next.action = &rcua_next;
258 rcu_heads_add_tail(&rcu_heads, &rn->head_next);
259
260 /* free rn that we allocated above.
261 *
262 * This is INTENTIONALLY not built into the RCUA_NEXT action. This
263 * ensures that after the action above is popped off the queue, there
264 * is still at least 1 item on the RCU queue. This means we never
265 * delete the last item, which is extremely important since it keeps
266 * the atomlist ->last pointer alive and well.
267 *
268 * If we were to "run dry" on the RCU queue, add_tail may run into the
269 * "last item is being deleted - start over" case, and then we may end
270 * up accessing old RCU queue items that are already free'd.
271 */
272 rcu_free_internal(MTYPE_RCU_NEXT, rn, head_free);
273
274 /* Only allow the RCU sweeper to run after these 2 items are queued.
275 *
276 * If another thread enqueues some RCU action in the intermediate
277 * window here, nothing bad happens - the queued action is associated
278 * with a larger seq# than strictly necessary. Thus, it might get
279 * executed a bit later, but that's not a problem.
280 *
281 * If another thread acquires the read lock in this window, it holds
282 * the previous epoch, but its RCU queue actions will be in the next
283 * epoch. This isn't a problem either, just a tad inefficient.
284 */
285 seqlock_bump(&rcu_seq);
286 }
287
288 static void rcu_bump_maybe(void)
289 {
290 seqlock_val_t dirty;
291
292 dirty = atomic_load_explicit(&rcu_dirty, memory_order_relaxed);
293 /* no problem if we race here and multiple threads bump rcu_seq;
294 * bumping too much causes no issues while not bumping enough will
295 * result in delayed cleanup
296 */
297 if (dirty == seqlock_cur(&rcu_seq))
298 rcu_bump();
299 }
300
301 void rcu_read_lock(void)
302 {
303 struct rcu_thread *rt = rcu_self();
304
305 assert(rt);
306 if (rt->depth++ > 0)
307 return;
308
309 seqlock_acquire(&rt->rcu, &rcu_seq);
310 /* need to hold RCU for bump ... */
311 rcu_bump_maybe();
312 /* ... but no point in holding the old epoch if we just bumped */
313 seqlock_acquire(&rt->rcu, &rcu_seq);
314 }
315
316 void rcu_read_unlock(void)
317 {
318 struct rcu_thread *rt = rcu_self();
319
320 assert(rt && rt->depth);
321 if (--rt->depth > 0)
322 return;
323 rcu_bump_maybe();
324 seqlock_release(&rt->rcu);
325 }
326
327 void rcu_assert_read_locked(void)
328 {
329 struct rcu_thread *rt = rcu_self();
330 assert(rt && rt->depth && seqlock_held(&rt->rcu));
331 }
332
333 void rcu_assert_read_unlocked(void)
334 {
335 struct rcu_thread *rt = rcu_self();
336 assert(rt && !rt->depth && !seqlock_held(&rt->rcu));
337 }
338
339 /*
340 * RCU resource-release thread
341 */
342
343 static void *rcu_main(void *arg);
344
345 static void rcu_start(void)
346 {
347 /* ensure we never handle signals on the RCU thread by blocking
348 * everything here (new thread inherits signal mask)
349 */
350 sigset_t oldsigs, blocksigs;
351
352 sigfillset(&blocksigs);
353 pthread_sigmask(SIG_BLOCK, &blocksigs, &oldsigs);
354
355 rcu_active = true;
356
357 assert(!pthread_create(&rcu_pthread, NULL, rcu_main, NULL));
358
359 pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
360
361 #ifdef HAVE_PTHREAD_SETNAME_NP
362 # ifdef GNU_LINUX
363 pthread_setname_np(rcu_pthread, "RCU sweeper");
364 # elif defined(__NetBSD__)
365 pthread_setname_np(rcu_pthread, "RCU sweeper", NULL);
366 # endif
367 #elif defined(HAVE_PTHREAD_SET_NAME_NP)
368 pthread_set_name_np(rcu_pthread, "RCU sweeper");
369 #endif
370 }
371
372 static void rcu_do(struct rcu_head *rh)
373 {
374 struct rcu_head_close *rhc;
375 void *p;
376
377 switch (rh->action->type) {
378 case RCUA_FREE:
379 p = (char *)rh - rh->action->u.free.offset;
380 if (rh->action->u.free.mt)
381 qfree(rh->action->u.free.mt, p);
382 else
383 free(p);
384 break;
385 case RCUA_CLOSE:
386 rhc = container_of(rh, struct rcu_head_close,
387 rcu_head);
388 close(rhc->fd);
389 break;
390 case RCUA_CALL:
391 p = (char *)rh - rh->action->u.call.offset;
392 rh->action->u.call.fptr(p);
393 break;
394
395 case RCUA_INVALID:
396 case RCUA_NEXT:
397 case RCUA_END:
398 default:
399 assert(0);
400 }
401 }
402
403 static void rcu_watchdog(struct rcu_thread *rt)
404 {
405 #if 0
406 /* future work: print a backtrace for the thread that's holding up
407 * RCU. The only (good) way of doing that is to send a signal to the
408 * other thread, save away the backtrace in the signal handler, and
409 * block here until the signal is done processing.
410 *
411 * Just haven't implemented that yet.
412 */
413 fprintf(stderr, "RCU watchdog %p\n", rt);
414 #endif
415 }
416
417 static void *rcu_main(void *arg)
418 {
419 struct rcu_thread *rt;
420 struct rcu_head *rh = NULL;
421 bool end = false;
422 struct timespec maxwait;
423
424 seqlock_val_t rcuval = SEQLOCK_STARTVAL;
425
426 pthread_setspecific(rcu_thread_key, &rcu_thread_rcu);
427
428 while (!end) {
429 seqlock_wait(&rcu_seq, rcuval);
430
431 /* RCU watchdog timeout, TODO: configurable value */
432 clock_gettime(CLOCK_MONOTONIC, &maxwait);
433 maxwait.tv_nsec += 100 * 1000 * 1000;
434 if (maxwait.tv_nsec >= 1000000000) {
435 maxwait.tv_sec++;
436 maxwait.tv_nsec -= 1000000000;
437 }
438
439 frr_each (rcu_threads, &rcu_threads, rt)
440 if (!seqlock_timedwait(&rt->rcu, rcuval, &maxwait)) {
441 rcu_watchdog(rt);
442 seqlock_wait(&rt->rcu, rcuval);
443 }
444
445 while ((rh = rcu_heads_pop(&rcu_heads))) {
446 if (rh->action->type == RCUA_NEXT)
447 break;
448 else if (rh->action->type == RCUA_END)
449 end = true;
450 else
451 rcu_do(rh);
452 }
453
454 rcuval += SEQLOCK_INCR;
455 }
456
457 /* rcu_shutdown can only be called singlethreaded, and it does a
458 * pthread_join, so it should be impossible that anything ended up
459 * on the queue after RCUA_END
460 */
461 #if 1
462 assert(!rcu_heads_first(&rcu_heads));
463 #else
464 while ((rh = rcu_heads_pop(&rcu_heads)))
465 if (rh->action->type >= RCUA_FREE)
466 rcu_do(rh);
467 #endif
468 return NULL;
469 }
470
471 void rcu_shutdown(void)
472 {
473 static struct rcu_head rcu_head_end;
474 struct rcu_thread *rt = rcu_self();
475 void *retval;
476
477 if (!rcu_active)
478 return;
479
480 rcu_assert_read_locked();
481 assert(rcu_threads_count(&rcu_threads) == 1);
482
483 rcu_enqueue(&rcu_head_end, &rcua_end);
484
485 rt->depth = 0;
486 seqlock_release(&rt->rcu);
487 seqlock_release(&rcu_seq);
488 rcu_active = false;
489
490 /* clearing rcu_active is before pthread_join in case we hang in
491 * pthread_join & get a SIGTERM or something - in that case, just
492 * ignore the maybe-still-running RCU thread
493 */
494 if (pthread_join(rcu_pthread, &retval) == 0) {
495 seqlock_acquire_val(&rcu_seq, SEQLOCK_STARTVAL);
496 seqlock_acquire_val(&rt->rcu, SEQLOCK_STARTVAL);
497 rt->depth = 1;
498 }
499 }
500
501 /*
502 * RCU'd free functions
503 */
504
505 void rcu_enqueue(struct rcu_head *rh, const struct rcu_action *action)
506 {
507 /* refer to rcu_bump() for why we need to hold RCU when adding items
508 * to rcu_heads
509 */
510 rcu_assert_read_locked();
511
512 rh->action = action;
513
514 if (!rcu_active) {
515 rcu_do(rh);
516 return;
517 }
518 rcu_heads_add_tail(&rcu_heads, rh);
519 atomic_store_explicit(&rcu_dirty, seqlock_cur(&rcu_seq),
520 memory_order_relaxed);
521 }
522
523 void rcu_close(struct rcu_head_close *rhc, int fd)
524 {
525 rhc->fd = fd;
526 rcu_enqueue(&rhc->rcu_head, &rcua_close);
527 }