]> git.proxmox.com Git - mirror_frr.git/blob - lib/frrcu.c
*: auto-convert to SPDX License IDs
[mirror_frr.git] / lib / frrcu.c
1 // SPDX-License-Identifier: ISC
2 /*
3 * Copyright (c) 2017-19 David Lamparter, for NetDEF, Inc.
4 */
5
6 /* implementation notes: this is an epoch-based RCU implementation. rcu_seq
7 * (global variable) counts the current epoch. Threads hold a specific epoch
8 * in rcu_read_lock(). This is the oldest epoch a thread might be accessing
9 * data from.
10 *
11 * The rcu_seq global is only pushed forward on rcu_read_lock() and
12 * rcu_read_unlock() calls. This makes things a tad more efficient since
13 * those are the only places it matters:
14 * - on rcu_read_lock, we don't want to hold an old epoch pointlessly
15 * - on rcu_read_unlock, we want to make sure we're not stuck on an old epoch
16 * when heading into a long idle period where no thread holds RCU
17 *
18 * rcu_thread structures themselves are RCU-free'd.
19 *
20 * rcu_head structures are the most iffy; normally for an ATOMLIST we would
21 * need to make sure we use rcu_free or pthread_rwlock to deallocate old items
22 * to prevent ABA or use-after-free problems. However, our ATOMLIST code
23 * guarantees that if the list remains non-empty in all cases, we only need
24 * the "last" pointer to do an "add_tail()", i.e. we can't run into ABA/UAF
25 * issues - but we do need to keep at least 1 item on the list.
26 *
27 * (Search the atomlist code for all uses of "last")
28 */
29
30 #ifdef HAVE_CONFIG_H
31 #include "config.h"
32 #endif
33
34 #include <pthread.h>
35 #ifdef HAVE_PTHREAD_NP_H
36 #include <pthread_np.h>
37 #endif
38 #include <string.h>
39 #include <unistd.h>
40 #include <signal.h>
41
42 #include "frrcu.h"
43 #include "seqlock.h"
44 #include "atomlist.h"
45
46 DEFINE_MTYPE_STATIC(LIB, RCU_THREAD, "RCU thread");
47 DEFINE_MTYPE_STATIC(LIB, RCU_NEXT, "RCU sequence barrier");
48
49 DECLARE_ATOMLIST(rcu_heads, struct rcu_head, head);
50
51 PREDECL_ATOMLIST(rcu_threads);
52 struct rcu_thread {
53 struct rcu_threads_item head;
54
55 struct rcu_head rcu_head;
56
57 struct seqlock rcu;
58
59 /* only accessed by thread itself, not atomic */
60 unsigned depth;
61 };
62 DECLARE_ATOMLIST(rcu_threads, struct rcu_thread, head);
63
64 static const struct rcu_action rcua_next = { .type = RCUA_NEXT };
65 static const struct rcu_action rcua_end = { .type = RCUA_END };
66 static const struct rcu_action rcua_close = { .type = RCUA_CLOSE };
67
68 struct rcu_next {
69 struct rcu_head head_free;
70 struct rcu_head head_next;
71 };
72
73 #define rcu_free_internal(mtype, ptr, field) \
74 do { \
75 typeof(ptr) _ptr = (ptr); \
76 struct rcu_head *_rcu_head = &_ptr->field; \
77 static const struct rcu_action _rcu_action = { \
78 .type = RCUA_FREE, \
79 .u.free = { \
80 .mt = mtype, \
81 .offset = offsetof(typeof(*_ptr), field), \
82 }, \
83 }; \
84 _rcu_head->action = &_rcu_action; \
85 rcu_heads_add_tail(&rcu_heads, _rcu_head); \
86 } while (0)
87
88 /* primary global RCU position */
89 static struct seqlock rcu_seq;
90 /* this is set to rcu_seq whenever something is added on the RCU queue.
91 * rcu_read_lock() and rcu_read_unlock() will then bump rcu_seq up one step.
92 */
93 static _Atomic seqlock_val_t rcu_dirty;
94
95 static struct rcu_threads_head rcu_threads;
96 static struct rcu_heads_head rcu_heads;
97
98 /* main thread & RCU sweeper have pre-setup rcu_thread structures. The
99 * reasons are different:
100 *
101 * - rcu_thread_main is there because the main thread isn't started like
102 * other threads, it's implicitly created when the program is started. So
103 * rcu_thread_main matches up implicitly.
104 *
105 * - rcu_thread_rcu isn't actually put on the rcu_threads list (makes no
106 * sense really), it only exists so we can call RCU-using functions from
107 * the RCU thread without special handling in rcu_read_lock/unlock.
108 */
109 static struct rcu_thread rcu_thread_main;
110 static struct rcu_thread rcu_thread_rcu;
111
112 static pthread_t rcu_pthread;
113 static pthread_key_t rcu_thread_key;
114 static bool rcu_active;
115
116 static void rcu_start(void);
117 static void rcu_bump(void);
118
119 /*
120 * preinitialization for main thread
121 */
122 static void rcu_thread_end(void *rcu_thread);
123
124 static void rcu_preinit(void) __attribute__((constructor));
125 static void rcu_preinit(void)
126 {
127 struct rcu_thread *rt;
128
129 rt = &rcu_thread_main;
130 rt->depth = 1;
131 seqlock_init(&rt->rcu);
132 seqlock_acquire_val(&rt->rcu, SEQLOCK_STARTVAL);
133
134 pthread_key_create(&rcu_thread_key, rcu_thread_end);
135 pthread_setspecific(rcu_thread_key, rt);
136
137 rcu_threads_add_tail(&rcu_threads, rt);
138
139 /* RCU sweeper's rcu_thread is a dummy, NOT added to rcu_threads */
140 rt = &rcu_thread_rcu;
141 rt->depth = 1;
142
143 seqlock_init(&rcu_seq);
144 seqlock_acquire_val(&rcu_seq, SEQLOCK_STARTVAL);
145 }
146
147 static struct rcu_thread *rcu_self(void)
148 {
149 return (struct rcu_thread *)pthread_getspecific(rcu_thread_key);
150 }
151
152 /*
153 * thread management (for the non-main thread)
154 */
155 struct rcu_thread *rcu_thread_prepare(void)
156 {
157 struct rcu_thread *rt, *cur;
158
159 rcu_assert_read_locked();
160
161 if (!rcu_active)
162 rcu_start();
163
164 cur = rcu_self();
165 assert(cur->depth);
166
167 /* new thread always starts with rcu_read_lock held at depth 1, and
168 * holding the same epoch as the parent (this makes it possible to
169 * use RCU for things passed into the thread through its arg)
170 */
171 rt = XCALLOC(MTYPE_RCU_THREAD, sizeof(*rt));
172 rt->depth = 1;
173
174 seqlock_init(&rt->rcu);
175 seqlock_acquire(&rt->rcu, &cur->rcu);
176
177 rcu_threads_add_tail(&rcu_threads, rt);
178
179 return rt;
180 }
181
182 void rcu_thread_start(struct rcu_thread *rt)
183 {
184 pthread_setspecific(rcu_thread_key, rt);
185 }
186
187 void rcu_thread_unprepare(struct rcu_thread *rt)
188 {
189 if (rt == &rcu_thread_rcu)
190 return;
191
192 rt->depth = 1;
193 seqlock_acquire(&rt->rcu, &rcu_seq);
194
195 rcu_bump();
196 if (rt != &rcu_thread_main)
197 /* this free() happens after seqlock_release() below */
198 rcu_free_internal(MTYPE_RCU_THREAD, rt, rcu_head);
199
200 rcu_threads_del(&rcu_threads, rt);
201 seqlock_release(&rt->rcu);
202 }
203
204 static void rcu_thread_end(void *rtvoid)
205 {
206 struct rcu_thread *rt = rtvoid;
207 rcu_thread_unprepare(rt);
208 }
209
210 /*
211 * main RCU control aspects
212 */
213
214 static void rcu_bump(void)
215 {
216 struct rcu_next *rn;
217
218 rn = XMALLOC(MTYPE_RCU_NEXT, sizeof(*rn));
219
220 /* note: each RCUA_NEXT item corresponds to exactly one seqno bump.
221 * This means we don't need to communicate which seqno is which
222 * RCUA_NEXT, since we really don't care.
223 */
224
225 /*
226 * Important race condition: while rcu_heads_add_tail is executing,
227 * there is an intermediate point where the rcu_heads "last" pointer
228 * already points to rn->head_next, but rn->head_next isn't added to
229 * the list yet. That means any other "add_tail" calls append to this
230 * item, which isn't fully on the list yet. Freeze this thread at
231 * that point and look at another thread doing a rcu_bump. It adds
232 * these two items and then does a seqlock_bump. But the rcu_heads
233 * list is still "interrupted" and there's no RCUA_NEXT on the list
234 * yet (from either the frozen thread or the second thread). So
235 * rcu_main() might actually hit the end of the list at the
236 * "interrupt".
237 *
238 * This situation is prevented by requiring that rcu_read_lock is held
239 * for any calls to rcu_bump, since if we're holding the current RCU
240 * epoch, that means rcu_main can't be chewing on rcu_heads and hit
241 * that interruption point. Only by the time the thread has continued
242 * to rcu_read_unlock() - and therefore completed the add_tail - the
243 * RCU sweeper gobbles up the epoch and can be sure to find at least
244 * the RCUA_NEXT and RCUA_FREE items on rcu_heads.
245 */
246 rn->head_next.action = &rcua_next;
247 rcu_heads_add_tail(&rcu_heads, &rn->head_next);
248
249 /* free rn that we allocated above.
250 *
251 * This is INTENTIONALLY not built into the RCUA_NEXT action. This
252 * ensures that after the action above is popped off the queue, there
253 * is still at least 1 item on the RCU queue. This means we never
254 * delete the last item, which is extremely important since it keeps
255 * the atomlist ->last pointer alive and well.
256 *
257 * If we were to "run dry" on the RCU queue, add_tail may run into the
258 * "last item is being deleted - start over" case, and then we may end
259 * up accessing old RCU queue items that are already free'd.
260 */
261 rcu_free_internal(MTYPE_RCU_NEXT, rn, head_free);
262
263 /* Only allow the RCU sweeper to run after these 2 items are queued.
264 *
265 * If another thread enqueues some RCU action in the intermediate
266 * window here, nothing bad happens - the queued action is associated
267 * with a larger seq# than strictly necessary. Thus, it might get
268 * executed a bit later, but that's not a problem.
269 *
270 * If another thread acquires the read lock in this window, it holds
271 * the previous epoch, but its RCU queue actions will be in the next
272 * epoch. This isn't a problem either, just a tad inefficient.
273 */
274 seqlock_bump(&rcu_seq);
275 }
276
277 static void rcu_bump_maybe(void)
278 {
279 seqlock_val_t dirty;
280
281 dirty = atomic_load_explicit(&rcu_dirty, memory_order_relaxed);
282 /* no problem if we race here and multiple threads bump rcu_seq;
283 * bumping too much causes no issues while not bumping enough will
284 * result in delayed cleanup
285 */
286 if (dirty == seqlock_cur(&rcu_seq))
287 rcu_bump();
288 }
289
290 void rcu_read_lock(void)
291 {
292 struct rcu_thread *rt = rcu_self();
293
294 assert(rt);
295 if (rt->depth++ > 0)
296 return;
297
298 seqlock_acquire(&rt->rcu, &rcu_seq);
299 /* need to hold RCU for bump ... */
300 rcu_bump_maybe();
301 /* ... but no point in holding the old epoch if we just bumped */
302 seqlock_acquire(&rt->rcu, &rcu_seq);
303 }
304
305 void rcu_read_unlock(void)
306 {
307 struct rcu_thread *rt = rcu_self();
308
309 assert(rt && rt->depth);
310 if (--rt->depth > 0)
311 return;
312 rcu_bump_maybe();
313 seqlock_release(&rt->rcu);
314 }
315
316 void rcu_assert_read_locked(void)
317 {
318 struct rcu_thread *rt = rcu_self();
319 assert(rt && rt->depth && seqlock_held(&rt->rcu));
320 }
321
322 void rcu_assert_read_unlocked(void)
323 {
324 struct rcu_thread *rt = rcu_self();
325 assert(rt && !rt->depth && !seqlock_held(&rt->rcu));
326 }
327
328 /*
329 * RCU resource-release thread
330 */
331
332 static void *rcu_main(void *arg);
333
334 static void rcu_start(void)
335 {
336 /* ensure we never handle signals on the RCU thread by blocking
337 * everything here (new thread inherits signal mask)
338 */
339 sigset_t oldsigs, blocksigs;
340
341 sigfillset(&blocksigs);
342 pthread_sigmask(SIG_BLOCK, &blocksigs, &oldsigs);
343
344 rcu_active = true;
345
346 assert(!pthread_create(&rcu_pthread, NULL, rcu_main, NULL));
347
348 pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
349
350 #ifdef HAVE_PTHREAD_SETNAME_NP
351 # ifdef GNU_LINUX
352 pthread_setname_np(rcu_pthread, "RCU sweeper");
353 # elif defined(__NetBSD__)
354 pthread_setname_np(rcu_pthread, "RCU sweeper", NULL);
355 # endif
356 #elif defined(HAVE_PTHREAD_SET_NAME_NP)
357 pthread_set_name_np(rcu_pthread, "RCU sweeper");
358 #endif
359 }
360
361 static void rcu_do(struct rcu_head *rh)
362 {
363 struct rcu_head_close *rhc;
364 void *p;
365
366 switch (rh->action->type) {
367 case RCUA_FREE:
368 p = (char *)rh - rh->action->u.free.offset;
369 if (rh->action->u.free.mt)
370 qfree(rh->action->u.free.mt, p);
371 else
372 free(p);
373 break;
374 case RCUA_CLOSE:
375 rhc = container_of(rh, struct rcu_head_close,
376 rcu_head);
377 close(rhc->fd);
378 break;
379 case RCUA_CALL:
380 p = (char *)rh - rh->action->u.call.offset;
381 rh->action->u.call.fptr(p);
382 break;
383
384 case RCUA_INVALID:
385 case RCUA_NEXT:
386 case RCUA_END:
387 default:
388 assert(0);
389 }
390 }
391
392 static void rcu_watchdog(struct rcu_thread *rt)
393 {
394 #if 0
395 /* future work: print a backtrace for the thread that's holding up
396 * RCU. The only (good) way of doing that is to send a signal to the
397 * other thread, save away the backtrace in the signal handler, and
398 * block here until the signal is done processing.
399 *
400 * Just haven't implemented that yet.
401 */
402 fprintf(stderr, "RCU watchdog %p\n", rt);
403 #endif
404 }
405
406 static void *rcu_main(void *arg)
407 {
408 struct rcu_thread *rt;
409 struct rcu_head *rh = NULL;
410 bool end = false;
411 struct timespec maxwait;
412
413 seqlock_val_t rcuval = SEQLOCK_STARTVAL;
414
415 pthread_setspecific(rcu_thread_key, &rcu_thread_rcu);
416
417 while (!end) {
418 seqlock_wait(&rcu_seq, rcuval);
419
420 /* RCU watchdog timeout, TODO: configurable value */
421 clock_gettime(CLOCK_MONOTONIC, &maxwait);
422 maxwait.tv_nsec += 100 * 1000 * 1000;
423 if (maxwait.tv_nsec >= 1000000000) {
424 maxwait.tv_sec++;
425 maxwait.tv_nsec -= 1000000000;
426 }
427
428 frr_each (rcu_threads, &rcu_threads, rt)
429 if (!seqlock_timedwait(&rt->rcu, rcuval, &maxwait)) {
430 rcu_watchdog(rt);
431 seqlock_wait(&rt->rcu, rcuval);
432 }
433
434 while ((rh = rcu_heads_pop(&rcu_heads))) {
435 if (rh->action->type == RCUA_NEXT)
436 break;
437 else if (rh->action->type == RCUA_END)
438 end = true;
439 else
440 rcu_do(rh);
441 }
442
443 rcuval += SEQLOCK_INCR;
444 }
445
446 /* rcu_shutdown can only be called singlethreaded, and it does a
447 * pthread_join, so it should be impossible that anything ended up
448 * on the queue after RCUA_END
449 */
450 #if 1
451 assert(!rcu_heads_first(&rcu_heads));
452 #else
453 while ((rh = rcu_heads_pop(&rcu_heads)))
454 if (rh->action->type >= RCUA_FREE)
455 rcu_do(rh);
456 #endif
457 return NULL;
458 }
459
460 void rcu_shutdown(void)
461 {
462 static struct rcu_head rcu_head_end;
463 struct rcu_thread *rt = rcu_self();
464 void *retval;
465
466 if (!rcu_active)
467 return;
468
469 rcu_assert_read_locked();
470 assert(rcu_threads_count(&rcu_threads) == 1);
471
472 rcu_enqueue(&rcu_head_end, &rcua_end);
473
474 rt->depth = 0;
475 seqlock_release(&rt->rcu);
476 seqlock_release(&rcu_seq);
477 rcu_active = false;
478
479 /* clearing rcu_active is before pthread_join in case we hang in
480 * pthread_join & get a SIGTERM or something - in that case, just
481 * ignore the maybe-still-running RCU thread
482 */
483 if (pthread_join(rcu_pthread, &retval) == 0) {
484 seqlock_acquire_val(&rcu_seq, SEQLOCK_STARTVAL);
485 seqlock_acquire_val(&rt->rcu, SEQLOCK_STARTVAL);
486 rt->depth = 1;
487 }
488 }
489
490 /*
491 * RCU'd free functions
492 */
493
494 void rcu_enqueue(struct rcu_head *rh, const struct rcu_action *action)
495 {
496 /* refer to rcu_bump() for why we need to hold RCU when adding items
497 * to rcu_heads
498 */
499 rcu_assert_read_locked();
500
501 rh->action = action;
502
503 if (!rcu_active) {
504 rcu_do(rh);
505 return;
506 }
507 rcu_heads_add_tail(&rcu_heads, rh);
508 atomic_store_explicit(&rcu_dirty, seqlock_cur(&rcu_seq),
509 memory_order_relaxed);
510 }
511
512 void rcu_close(struct rcu_head_close *rhc, int fd)
513 {
514 rhc->fd = fd;
515 rcu_enqueue(&rhc->rcu_head, &rcua_close);
516 }