]> git.proxmox.com Git - mirror_frr.git/blob - lib/frrcu.c
Merge pull request #4789 from sworleys/Nexthop-Sort-Optimization
[mirror_frr.git] / lib / frrcu.c
1 /*
2 * Copyright (c) 2017-19 David Lamparter, for NetDEF, Inc.
3 *
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
7 *
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15 */
16
17 /* implementation notes: this is an epoch-based RCU implementation. rcu_seq
18 * (global variable) counts the current epoch. Threads hold a specific epoch
19 * in rcu_read_lock(). This is the oldest epoch a thread might be accessing
20 * data from.
21 *
22 * The rcu_seq global is only pushed forward on rcu_read_lock() and
23 * rcu_read_unlock() calls. This makes things a tad more efficient since
24 * those are the only places it matters:
25 * - on rcu_read_lock, we don't want to hold an old epoch pointlessly
26 * - on rcu_read_unlock, we want to make sure we're not stuck on an old epoch
27 * when heading into a long idle period where no thread holds RCU
28 *
29 * rcu_thread structures themselves are RCU-free'd.
30 *
31 * rcu_head structures are the most iffy; normally for an ATOMLIST we would
32 * need to make sure we use rcu_free or pthread_rwlock to deallocate old items
33 * to prevent ABA or use-after-free problems. However, our ATOMLIST code
34 * guarantees that if the list remains non-empty in all cases, we only need
35 * the "last" pointer to do an "add_tail()", i.e. we can't run into ABA/UAF
36 * issues - but we do need to keep at least 1 item on the list.
37 *
38 * (Search the atomlist code for all uses of "last")
39 */
40
41 #ifdef HAVE_CONFIG_H
42 #include "config.h"
43 #endif
44
45 #include <pthread.h>
46 #ifdef HAVE_PTHREAD_NP_H
47 #include <pthread_np.h>
48 #endif
49 #include <string.h>
50 #include <unistd.h>
51 #include <signal.h>
52
53 #include "frrcu.h"
54 #include "seqlock.h"
55 #include "atomlist.h"
56
57 DEFINE_MTYPE_STATIC(LIB, RCU_THREAD, "RCU thread")
58
59 DECLARE_ATOMLIST(rcu_heads, struct rcu_head, head)
60
61 PREDECL_ATOMLIST(rcu_threads)
62 struct rcu_thread {
63 struct rcu_threads_item head;
64
65 struct rcu_head rcu_head;
66
67 struct seqlock rcu;
68
69 /* only accessed by thread itself, not atomic */
70 unsigned depth;
71 };
72 DECLARE_ATOMLIST(rcu_threads, struct rcu_thread, head)
73
74 static const struct rcu_action rcua_next = { .type = RCUA_NEXT };
75 static const struct rcu_action rcua_end = { .type = RCUA_END };
76 static const struct rcu_action rcua_close = { .type = RCUA_CLOSE };
77
78 struct rcu_next {
79 struct rcu_head head_free;
80 struct rcu_head head_next;
81 };
82
83 #define rcu_free_internal(mtype, ptr, field) \
84 do { \
85 typeof(ptr) _ptr = (ptr); \
86 struct rcu_head *_rcu_head = &_ptr->field; \
87 static const struct rcu_action _rcu_action = { \
88 .type = RCUA_FREE, \
89 .u.free = { \
90 .mt = mtype, \
91 .offset = offsetof(typeof(*_ptr), field), \
92 }, \
93 }; \
94 _rcu_head->action = &_rcu_action; \
95 rcu_heads_add_tail(&rcu_heads, _rcu_head); \
96 } while (0)
97
98 /* primary global RCU position */
99 static struct seqlock rcu_seq;
100 /* this is set to rcu_seq whenever something is added on the RCU queue.
101 * rcu_read_lock() and rcu_read_unlock() will then bump rcu_seq up one step.
102 */
103 static _Atomic seqlock_val_t rcu_dirty;
104
105 static struct rcu_threads_head rcu_threads;
106 static struct rcu_heads_head rcu_heads;
107
108 /* main thread & RCU sweeper have pre-setup rcu_thread structures. The
109 * reasons are different:
110 *
111 * - rcu_thread_main is there because the main thread isn't started like
112 * other threads, it's implicitly created when the program is started. So
113 * rcu_thread_main matches up implicitly.
114 *
115 * - rcu_thread_rcu isn't actually put on the rcu_threads list (makes no
116 * sense really), it only exists so we can call RCU-using functions from
117 * the RCU thread without special handling in rcu_read_lock/unlock.
118 */
119 static struct rcu_thread rcu_thread_main;
120 static struct rcu_thread rcu_thread_rcu;
121
122 static pthread_t rcu_pthread;
123 static pthread_key_t rcu_thread_key;
124 static bool rcu_active;
125
126 static void rcu_start(void);
127 static void rcu_bump(void);
128
129 /*
130 * preinitialization for main thread
131 */
132 static void rcu_thread_end(void *rcu_thread);
133
134 static void rcu_preinit(void) __attribute__((constructor));
135 static void rcu_preinit(void)
136 {
137 struct rcu_thread *rt;
138
139 rt = &rcu_thread_main;
140 rt->depth = 1;
141 seqlock_init(&rt->rcu);
142 seqlock_acquire_val(&rt->rcu, SEQLOCK_STARTVAL);
143
144 pthread_key_create(&rcu_thread_key, rcu_thread_end);
145 pthread_setspecific(rcu_thread_key, rt);
146
147 rcu_threads_add_tail(&rcu_threads, rt);
148
149 /* RCU sweeper's rcu_thread is a dummy, NOT added to rcu_threads */
150 rt = &rcu_thread_rcu;
151 rt->depth = 1;
152
153 seqlock_init(&rcu_seq);
154 seqlock_acquire_val(&rcu_seq, SEQLOCK_STARTVAL);
155 }
156
157 static struct rcu_thread *rcu_self(void)
158 {
159 return (struct rcu_thread *)pthread_getspecific(rcu_thread_key);
160 }
161
162 /*
163 * thread management (for the non-main thread)
164 */
165 struct rcu_thread *rcu_thread_prepare(void)
166 {
167 struct rcu_thread *rt, *cur;
168
169 rcu_assert_read_locked();
170
171 if (!rcu_active)
172 rcu_start();
173
174 cur = rcu_self();
175 assert(cur->depth);
176
177 /* new thread always starts with rcu_read_lock held at depth 1, and
178 * holding the same epoch as the parent (this makes it possible to
179 * use RCU for things passed into the thread through its arg)
180 */
181 rt = XCALLOC(MTYPE_RCU_THREAD, sizeof(*rt));
182 rt->depth = 1;
183
184 seqlock_init(&rt->rcu);
185 seqlock_acquire(&rt->rcu, &cur->rcu);
186
187 rcu_threads_add_tail(&rcu_threads, rt);
188
189 return rt;
190 }
191
192 void rcu_thread_start(struct rcu_thread *rt)
193 {
194 pthread_setspecific(rcu_thread_key, rt);
195 }
196
197 void rcu_thread_unprepare(struct rcu_thread *rt)
198 {
199 if (rt == &rcu_thread_rcu)
200 return;
201
202 rt->depth = 1;
203 seqlock_acquire(&rt->rcu, &rcu_seq);
204
205 rcu_bump();
206 if (rt != &rcu_thread_main)
207 /* this free() happens after seqlock_release() below */
208 rcu_free_internal(MTYPE_RCU_THREAD, rt, rcu_head);
209
210 rcu_threads_del(&rcu_threads, rt);
211 seqlock_release(&rt->rcu);
212 }
213
214 static void rcu_thread_end(void *rtvoid)
215 {
216 struct rcu_thread *rt = rtvoid;
217 rcu_thread_unprepare(rt);
218 }
219
220 /*
221 * main RCU control aspects
222 */
223
224 static void rcu_bump(void)
225 {
226 struct rcu_next *rn;
227
228 rn = XMALLOC(MTYPE_RCU_THREAD, sizeof(*rn));
229
230 /* note: each RCUA_NEXT item corresponds to exactly one seqno bump.
231 * This means we don't need to communicate which seqno is which
232 * RCUA_NEXT, since we really don't care.
233 */
234
235 /*
236 * Important race condition: while rcu_heads_add_tail is executing,
237 * there is an intermediate point where the rcu_heads "last" pointer
238 * already points to rn->head_next, but rn->head_next isn't added to
239 * the list yet. That means any other "add_tail" calls append to this
240 * item, which isn't fully on the list yet. Freeze this thread at
241 * that point and look at another thread doing a rcu_bump. It adds
242 * these two items and then does a seqlock_bump. But the rcu_heads
243 * list is still "interrupted" and there's no RCUA_NEXT on the list
244 * yet (from either the frozen thread or the second thread). So
245 * rcu_main() might actually hit the end of the list at the
246 * "interrupt".
247 *
248 * This situation is prevented by requiring that rcu_read_lock is held
249 * for any calls to rcu_bump, since if we're holding the current RCU
250 * epoch, that means rcu_main can't be chewing on rcu_heads and hit
251 * that interruption point. Only by the time the thread has continued
252 * to rcu_read_unlock() - and therefore completed the add_tail - the
253 * RCU sweeper gobbles up the epoch and can be sure to find at least
254 * the RCUA_NEXT and RCUA_FREE items on rcu_heads.
255 */
256 rn->head_next.action = &rcua_next;
257 rcu_heads_add_tail(&rcu_heads, &rn->head_next);
258
259 /* free rn that we allocated above.
260 *
261 * This is INTENTIONALLY not built into the RCUA_NEXT action. This
262 * ensures that after the action above is popped off the queue, there
263 * is still at least 1 item on the RCU queue. This means we never
264 * delete the last item, which is extremely important since it keeps
265 * the atomlist ->last pointer alive and well.
266 *
267 * If we were to "run dry" on the RCU queue, add_tail may run into the
268 * "last item is being deleted - start over" case, and then we may end
269 * up accessing old RCU queue items that are already free'd.
270 */
271 rcu_free_internal(MTYPE_RCU_THREAD, rn, head_free);
272
273 /* Only allow the RCU sweeper to run after these 2 items are queued.
274 *
275 * If another thread enqueues some RCU action in the intermediate
276 * window here, nothing bad happens - the queued action is associated
277 * with a larger seq# than strictly necessary. Thus, it might get
278 * executed a bit later, but that's not a problem.
279 *
280 * If another thread acquires the read lock in this window, it holds
281 * the previous epoch, but its RCU queue actions will be in the next
282 * epoch. This isn't a problem either, just a tad inefficient.
283 */
284 seqlock_bump(&rcu_seq);
285 }
286
287 static void rcu_bump_maybe(void)
288 {
289 seqlock_val_t dirty;
290
291 dirty = atomic_load_explicit(&rcu_dirty, memory_order_relaxed);
292 /* no problem if we race here and multiple threads bump rcu_seq;
293 * bumping too much causes no issues while not bumping enough will
294 * result in delayed cleanup
295 */
296 if (dirty == seqlock_cur(&rcu_seq))
297 rcu_bump();
298 }
299
300 void rcu_read_lock(void)
301 {
302 struct rcu_thread *rt = rcu_self();
303
304 assert(rt);
305 if (rt->depth++ > 0)
306 return;
307
308 seqlock_acquire(&rt->rcu, &rcu_seq);
309 /* need to hold RCU for bump ... */
310 rcu_bump_maybe();
311 /* ... but no point in holding the old epoch if we just bumped */
312 seqlock_acquire(&rt->rcu, &rcu_seq);
313 }
314
315 void rcu_read_unlock(void)
316 {
317 struct rcu_thread *rt = rcu_self();
318
319 assert(rt && rt->depth);
320 if (--rt->depth > 0)
321 return;
322 rcu_bump_maybe();
323 seqlock_release(&rt->rcu);
324 }
325
326 void rcu_assert_read_locked(void)
327 {
328 struct rcu_thread *rt = rcu_self();
329 assert(rt && rt->depth && seqlock_held(&rt->rcu));
330 }
331
332 void rcu_assert_read_unlocked(void)
333 {
334 struct rcu_thread *rt = rcu_self();
335 assert(rt && !rt->depth && !seqlock_held(&rt->rcu));
336 }
337
338 /*
339 * RCU resource-release thread
340 */
341
342 static void *rcu_main(void *arg);
343
344 static void rcu_start(void)
345 {
346 /* ensure we never handle signals on the RCU thread by blocking
347 * everything here (new thread inherits signal mask)
348 */
349 sigset_t oldsigs, blocksigs;
350
351 sigfillset(&blocksigs);
352 pthread_sigmask(SIG_BLOCK, &blocksigs, &oldsigs);
353
354 rcu_active = true;
355
356 assert(!pthread_create(&rcu_pthread, NULL, rcu_main, NULL));
357
358 pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
359
360 #ifdef HAVE_PTHREAD_SETNAME_NP
361 # ifdef GNU_LINUX
362 pthread_setname_np(rcu_pthread, "RCU sweeper");
363 # elif defined(__NetBSD__)
364 pthread_setname_np(rcu_pthread, "RCU sweeper", NULL);
365 # endif
366 #elif defined(HAVE_PTHREAD_SET_NAME_NP)
367 pthread_set_name_np(rcu_pthread, "RCU sweeper");
368 #endif
369 }
370
371 static void rcu_do(struct rcu_head *rh)
372 {
373 struct rcu_head_close *rhc;
374 void *p;
375
376 switch (rh->action->type) {
377 case RCUA_FREE:
378 p = (char *)rh - rh->action->u.free.offset;
379 if (rh->action->u.free.mt)
380 qfree(rh->action->u.free.mt, p);
381 else
382 free(p);
383 break;
384 case RCUA_CLOSE:
385 rhc = container_of(rh, struct rcu_head_close,
386 rcu_head);
387 close(rhc->fd);
388 break;
389 case RCUA_CALL:
390 p = (char *)rh - rh->action->u.call.offset;
391 rh->action->u.call.fptr(p);
392 break;
393
394 case RCUA_INVALID:
395 case RCUA_NEXT:
396 case RCUA_END:
397 default:
398 assert(0);
399 }
400 }
401
402 static void rcu_watchdog(struct rcu_thread *rt)
403 {
404 #if 0
405 /* future work: print a backtrace for the thread that's holding up
406 * RCU. The only (good) way of doing that is to send a signal to the
407 * other thread, save away the backtrace in the signal handler, and
408 * block here until the signal is done processing.
409 *
410 * Just haven't implemented that yet.
411 */
412 fprintf(stderr, "RCU watchdog %p\n", rt);
413 #endif
414 }
415
416 static void *rcu_main(void *arg)
417 {
418 struct rcu_thread *rt;
419 struct rcu_head *rh = NULL;
420 bool end = false;
421 struct timespec maxwait;
422
423 seqlock_val_t rcuval = SEQLOCK_STARTVAL;
424
425 pthread_setspecific(rcu_thread_key, &rcu_thread_rcu);
426
427 while (!end) {
428 seqlock_wait(&rcu_seq, rcuval);
429
430 /* RCU watchdog timeout, TODO: configurable value */
431 clock_gettime(CLOCK_MONOTONIC, &maxwait);
432 maxwait.tv_nsec += 100 * 1000 * 1000;
433 if (maxwait.tv_nsec >= 1000000000) {
434 maxwait.tv_sec++;
435 maxwait.tv_nsec -= 1000000000;
436 }
437
438 frr_each (rcu_threads, &rcu_threads, rt)
439 if (!seqlock_timedwait(&rt->rcu, rcuval, &maxwait)) {
440 rcu_watchdog(rt);
441 seqlock_wait(&rt->rcu, rcuval);
442 }
443
444 while ((rh = rcu_heads_pop(&rcu_heads))) {
445 if (rh->action->type == RCUA_NEXT)
446 break;
447 else if (rh->action->type == RCUA_END)
448 end = true;
449 else
450 rcu_do(rh);
451 }
452
453 rcuval += SEQLOCK_INCR;
454 }
455
456 /* rcu_shutdown can only be called singlethreaded, and it does a
457 * pthread_join, so it should be impossible that anything ended up
458 * on the queue after RCUA_END
459 */
460 #if 1
461 assert(!rcu_heads_first(&rcu_heads));
462 #else
463 while ((rh = rcu_heads_pop(&rcu_heads)))
464 if (rh->action->type >= RCUA_FREE)
465 rcu_do(rh);
466 #endif
467 return NULL;
468 }
469
470 void rcu_shutdown(void)
471 {
472 static struct rcu_head rcu_head_end;
473 struct rcu_thread *rt = rcu_self();
474 void *retval;
475
476 if (!rcu_active)
477 return;
478
479 rcu_assert_read_locked();
480 assert(rcu_threads_count(&rcu_threads) == 1);
481
482 rcu_enqueue(&rcu_head_end, &rcua_end);
483
484 rt->depth = 0;
485 seqlock_release(&rt->rcu);
486 seqlock_release(&rcu_seq);
487 rcu_active = false;
488
489 /* clearing rcu_active is before pthread_join in case we hang in
490 * pthread_join & get a SIGTERM or something - in that case, just
491 * ignore the maybe-still-running RCU thread
492 */
493 if (pthread_join(rcu_pthread, &retval) == 0) {
494 seqlock_acquire_val(&rcu_seq, SEQLOCK_STARTVAL);
495 seqlock_acquire_val(&rt->rcu, SEQLOCK_STARTVAL);
496 rt->depth = 1;
497 }
498 }
499
500 /*
501 * RCU'd free functions
502 */
503
504 void rcu_enqueue(struct rcu_head *rh, const struct rcu_action *action)
505 {
506 /* refer to rcu_bump() for why we need to hold RCU when adding items
507 * to rcu_heads
508 */
509 rcu_assert_read_locked();
510
511 rh->action = action;
512
513 if (!rcu_active) {
514 rcu_do(rh);
515 return;
516 }
517 rcu_heads_add_tail(&rcu_heads, rh);
518 atomic_store_explicit(&rcu_dirty, seqlock_cur(&rcu_seq),
519 memory_order_relaxed);
520 }
521
522 void rcu_close(struct rcu_head_close *rhc, int fd)
523 {
524 rhc->fd = fd;
525 rcu_enqueue(&rhc->rcu_head, &rcua_close);
526 }