1 // SPDX-License-Identifier: ISC
3 * Copyright (c) 2017-19 David Lamparter, for NetDEF, Inc.
6 /* implementation notes: this is an epoch-based RCU implementation. rcu_seq
7 * (global variable) counts the current epoch. Threads hold a specific epoch
8 * in rcu_read_lock(). This is the oldest epoch a thread might be accessing
11 * The rcu_seq global is only pushed forward on rcu_read_lock() and
12 * rcu_read_unlock() calls. This makes things a tad more efficient since
13 * those are the only places it matters:
14 * - on rcu_read_lock, we don't want to hold an old epoch pointlessly
15 * - on rcu_read_unlock, we want to make sure we're not stuck on an old epoch
16 * when heading into a long idle period where no thread holds RCU
18 * rcu_thread structures themselves are RCU-free'd.
20 * rcu_head structures are the most iffy; normally for an ATOMLIST we would
21 * need to make sure we use rcu_free or pthread_rwlock to deallocate old items
22 * to prevent ABA or use-after-free problems. However, our ATOMLIST code
23 * guarantees that if the list remains non-empty in all cases, we only need
24 * the "last" pointer to do an "add_tail()", i.e. we can't run into ABA/UAF
25 * issues - but we do need to keep at least 1 item on the list.
27 * (Search the atomlist code for all uses of "last")
35 #ifdef HAVE_PTHREAD_NP_H
36 #include <pthread_np.h>
46 DEFINE_MTYPE_STATIC(LIB
, RCU_THREAD
, "RCU thread");
47 DEFINE_MTYPE_STATIC(LIB
, RCU_NEXT
, "RCU sequence barrier");
49 DECLARE_ATOMLIST(rcu_heads
, struct rcu_head
, head
);
51 PREDECL_ATOMLIST(rcu_threads
);
53 struct rcu_threads_item head
;
55 struct rcu_head rcu_head
;
59 /* only accessed by thread itself, not atomic */
62 DECLARE_ATOMLIST(rcu_threads
, struct rcu_thread
, head
);
64 static const struct rcu_action rcua_next
= { .type
= RCUA_NEXT
};
65 static const struct rcu_action rcua_end
= { .type
= RCUA_END
};
66 static const struct rcu_action rcua_close
= { .type
= RCUA_CLOSE
};
69 struct rcu_head head_free
;
70 struct rcu_head head_next
;
73 #define rcu_free_internal(mtype, ptr, field) \
75 typeof(ptr) _ptr = (ptr); \
76 struct rcu_head *_rcu_head = &_ptr->field; \
77 static const struct rcu_action _rcu_action = { \
81 .offset = offsetof(typeof(*_ptr), field), \
84 _rcu_head->action = &_rcu_action; \
85 rcu_heads_add_tail(&rcu_heads, _rcu_head); \
88 /* primary global RCU position */
89 static struct seqlock rcu_seq
;
90 /* this is set to rcu_seq whenever something is added on the RCU queue.
91 * rcu_read_lock() and rcu_read_unlock() will then bump rcu_seq up one step.
93 static _Atomic seqlock_val_t rcu_dirty
;
95 static struct rcu_threads_head rcu_threads
;
96 static struct rcu_heads_head rcu_heads
;
98 /* main thread & RCU sweeper have pre-setup rcu_thread structures. The
99 * reasons are different:
101 * - rcu_thread_main is there because the main thread isn't started like
102 * other threads, it's implicitly created when the program is started. So
103 * rcu_thread_main matches up implicitly.
105 * - rcu_thread_rcu isn't actually put on the rcu_threads list (makes no
106 * sense really), it only exists so we can call RCU-using functions from
107 * the RCU thread without special handling in rcu_read_lock/unlock.
109 static struct rcu_thread rcu_thread_main
;
110 static struct rcu_thread rcu_thread_rcu
;
112 static pthread_t rcu_pthread
;
113 static pthread_key_t rcu_thread_key
;
114 static bool rcu_active
;
116 static void rcu_start(void);
117 static void rcu_bump(void);
120 * preinitialization for main thread
122 static void rcu_thread_end(void *rcu_thread
);
124 static void rcu_preinit(void) __attribute__((constructor
));
125 static void rcu_preinit(void)
127 struct rcu_thread
*rt
;
129 rt
= &rcu_thread_main
;
131 seqlock_init(&rt
->rcu
);
132 seqlock_acquire_val(&rt
->rcu
, SEQLOCK_STARTVAL
);
134 pthread_key_create(&rcu_thread_key
, rcu_thread_end
);
135 pthread_setspecific(rcu_thread_key
, rt
);
137 rcu_threads_add_tail(&rcu_threads
, rt
);
139 /* RCU sweeper's rcu_thread is a dummy, NOT added to rcu_threads */
140 rt
= &rcu_thread_rcu
;
143 seqlock_init(&rcu_seq
);
144 seqlock_acquire_val(&rcu_seq
, SEQLOCK_STARTVAL
);
147 static struct rcu_thread
*rcu_self(void)
149 return (struct rcu_thread
*)pthread_getspecific(rcu_thread_key
);
153 * thread management (for the non-main thread)
155 struct rcu_thread
*rcu_thread_prepare(void)
157 struct rcu_thread
*rt
, *cur
;
159 rcu_assert_read_locked();
167 /* new thread always starts with rcu_read_lock held at depth 1, and
168 * holding the same epoch as the parent (this makes it possible to
169 * use RCU for things passed into the thread through its arg)
171 rt
= XCALLOC(MTYPE_RCU_THREAD
, sizeof(*rt
));
174 seqlock_init(&rt
->rcu
);
175 seqlock_acquire(&rt
->rcu
, &cur
->rcu
);
177 rcu_threads_add_tail(&rcu_threads
, rt
);
182 void rcu_thread_start(struct rcu_thread
*rt
)
184 pthread_setspecific(rcu_thread_key
, rt
);
187 void rcu_thread_unprepare(struct rcu_thread
*rt
)
189 if (rt
== &rcu_thread_rcu
)
193 seqlock_acquire(&rt
->rcu
, &rcu_seq
);
196 if (rt
!= &rcu_thread_main
)
197 /* this free() happens after seqlock_release() below */
198 rcu_free_internal(MTYPE_RCU_THREAD
, rt
, rcu_head
);
200 rcu_threads_del(&rcu_threads
, rt
);
201 seqlock_release(&rt
->rcu
);
204 static void rcu_thread_end(void *rtvoid
)
206 struct rcu_thread
*rt
= rtvoid
;
207 rcu_thread_unprepare(rt
);
211 * main RCU control aspects
214 static void rcu_bump(void)
218 rn
= XMALLOC(MTYPE_RCU_NEXT
, sizeof(*rn
));
220 /* note: each RCUA_NEXT item corresponds to exactly one seqno bump.
221 * This means we don't need to communicate which seqno is which
222 * RCUA_NEXT, since we really don't care.
226 * Important race condition: while rcu_heads_add_tail is executing,
227 * there is an intermediate point where the rcu_heads "last" pointer
228 * already points to rn->head_next, but rn->head_next isn't added to
229 * the list yet. That means any other "add_tail" calls append to this
230 * item, which isn't fully on the list yet. Freeze this thread at
231 * that point and look at another thread doing a rcu_bump. It adds
232 * these two items and then does a seqlock_bump. But the rcu_heads
233 * list is still "interrupted" and there's no RCUA_NEXT on the list
234 * yet (from either the frozen thread or the second thread). So
235 * rcu_main() might actually hit the end of the list at the
238 * This situation is prevented by requiring that rcu_read_lock is held
239 * for any calls to rcu_bump, since if we're holding the current RCU
240 * epoch, that means rcu_main can't be chewing on rcu_heads and hit
241 * that interruption point. Only by the time the thread has continued
242 * to rcu_read_unlock() - and therefore completed the add_tail - the
243 * RCU sweeper gobbles up the epoch and can be sure to find at least
244 * the RCUA_NEXT and RCUA_FREE items on rcu_heads.
246 rn
->head_next
.action
= &rcua_next
;
247 rcu_heads_add_tail(&rcu_heads
, &rn
->head_next
);
249 /* free rn that we allocated above.
251 * This is INTENTIONALLY not built into the RCUA_NEXT action. This
252 * ensures that after the action above is popped off the queue, there
253 * is still at least 1 item on the RCU queue. This means we never
254 * delete the last item, which is extremely important since it keeps
255 * the atomlist ->last pointer alive and well.
257 * If we were to "run dry" on the RCU queue, add_tail may run into the
258 * "last item is being deleted - start over" case, and then we may end
259 * up accessing old RCU queue items that are already free'd.
261 rcu_free_internal(MTYPE_RCU_NEXT
, rn
, head_free
);
263 /* Only allow the RCU sweeper to run after these 2 items are queued.
265 * If another thread enqueues some RCU action in the intermediate
266 * window here, nothing bad happens - the queued action is associated
267 * with a larger seq# than strictly necessary. Thus, it might get
268 * executed a bit later, but that's not a problem.
270 * If another thread acquires the read lock in this window, it holds
271 * the previous epoch, but its RCU queue actions will be in the next
272 * epoch. This isn't a problem either, just a tad inefficient.
274 seqlock_bump(&rcu_seq
);
277 static void rcu_bump_maybe(void)
281 dirty
= atomic_load_explicit(&rcu_dirty
, memory_order_relaxed
);
282 /* no problem if we race here and multiple threads bump rcu_seq;
283 * bumping too much causes no issues while not bumping enough will
284 * result in delayed cleanup
286 if (dirty
== seqlock_cur(&rcu_seq
))
290 void rcu_read_lock(void)
292 struct rcu_thread
*rt
= rcu_self();
298 seqlock_acquire(&rt
->rcu
, &rcu_seq
);
299 /* need to hold RCU for bump ... */
301 /* ... but no point in holding the old epoch if we just bumped */
302 seqlock_acquire(&rt
->rcu
, &rcu_seq
);
305 void rcu_read_unlock(void)
307 struct rcu_thread
*rt
= rcu_self();
309 assert(rt
&& rt
->depth
);
313 seqlock_release(&rt
->rcu
);
316 void rcu_assert_read_locked(void)
318 struct rcu_thread
*rt
= rcu_self();
319 assert(rt
&& rt
->depth
&& seqlock_held(&rt
->rcu
));
322 void rcu_assert_read_unlocked(void)
324 struct rcu_thread
*rt
= rcu_self();
325 assert(rt
&& !rt
->depth
&& !seqlock_held(&rt
->rcu
));
329 * RCU resource-release thread
332 static void *rcu_main(void *arg
);
334 static void rcu_start(void)
336 /* ensure we never handle signals on the RCU thread by blocking
337 * everything here (new thread inherits signal mask)
339 sigset_t oldsigs
, blocksigs
;
341 sigfillset(&blocksigs
);
342 pthread_sigmask(SIG_BLOCK
, &blocksigs
, &oldsigs
);
346 assert(!pthread_create(&rcu_pthread
, NULL
, rcu_main
, NULL
));
348 pthread_sigmask(SIG_SETMASK
, &oldsigs
, NULL
);
350 #ifdef HAVE_PTHREAD_SETNAME_NP
352 pthread_setname_np(rcu_pthread
, "RCU sweeper");
353 # elif defined(__NetBSD__)
354 pthread_setname_np(rcu_pthread
, "RCU sweeper", NULL
);
356 #elif defined(HAVE_PTHREAD_SET_NAME_NP)
357 pthread_set_name_np(rcu_pthread
, "RCU sweeper");
361 static void rcu_do(struct rcu_head
*rh
)
363 struct rcu_head_close
*rhc
;
366 switch (rh
->action
->type
) {
368 p
= (char *)rh
- rh
->action
->u
.free
.offset
;
369 if (rh
->action
->u
.free
.mt
)
370 qfree(rh
->action
->u
.free
.mt
, p
);
375 rhc
= container_of(rh
, struct rcu_head_close
,
380 p
= (char *)rh
- rh
->action
->u
.call
.offset
;
381 rh
->action
->u
.call
.fptr(p
);
392 static void rcu_watchdog(struct rcu_thread
*rt
)
395 /* future work: print a backtrace for the thread that's holding up
396 * RCU. The only (good) way of doing that is to send a signal to the
397 * other thread, save away the backtrace in the signal handler, and
398 * block here until the signal is done processing.
400 * Just haven't implemented that yet.
402 fprintf(stderr
, "RCU watchdog %p\n", rt
);
406 static void *rcu_main(void *arg
)
408 struct rcu_thread
*rt
;
409 struct rcu_head
*rh
= NULL
;
411 struct timespec maxwait
;
413 seqlock_val_t rcuval
= SEQLOCK_STARTVAL
;
415 pthread_setspecific(rcu_thread_key
, &rcu_thread_rcu
);
418 seqlock_wait(&rcu_seq
, rcuval
);
420 /* RCU watchdog timeout, TODO: configurable value */
421 clock_gettime(CLOCK_MONOTONIC
, &maxwait
);
422 maxwait
.tv_nsec
+= 100 * 1000 * 1000;
423 if (maxwait
.tv_nsec
>= 1000000000) {
425 maxwait
.tv_nsec
-= 1000000000;
428 frr_each (rcu_threads
, &rcu_threads
, rt
)
429 if (!seqlock_timedwait(&rt
->rcu
, rcuval
, &maxwait
)) {
431 seqlock_wait(&rt
->rcu
, rcuval
);
434 while ((rh
= rcu_heads_pop(&rcu_heads
))) {
435 if (rh
->action
->type
== RCUA_NEXT
)
437 else if (rh
->action
->type
== RCUA_END
)
443 rcuval
+= SEQLOCK_INCR
;
446 /* rcu_shutdown can only be called singlethreaded, and it does a
447 * pthread_join, so it should be impossible that anything ended up
448 * on the queue after RCUA_END
451 assert(!rcu_heads_first(&rcu_heads
));
453 while ((rh
= rcu_heads_pop(&rcu_heads
)))
454 if (rh
->action
->type
>= RCUA_FREE
)
460 void rcu_shutdown(void)
462 static struct rcu_head rcu_head_end
;
463 struct rcu_thread
*rt
= rcu_self();
469 rcu_assert_read_locked();
470 assert(rcu_threads_count(&rcu_threads
) == 1);
472 rcu_enqueue(&rcu_head_end
, &rcua_end
);
475 seqlock_release(&rt
->rcu
);
476 seqlock_release(&rcu_seq
);
479 /* clearing rcu_active is before pthread_join in case we hang in
480 * pthread_join & get a SIGTERM or something - in that case, just
481 * ignore the maybe-still-running RCU thread
483 if (pthread_join(rcu_pthread
, &retval
) == 0) {
484 seqlock_acquire_val(&rcu_seq
, SEQLOCK_STARTVAL
);
485 seqlock_acquire_val(&rt
->rcu
, SEQLOCK_STARTVAL
);
491 * RCU'd free functions
494 void rcu_enqueue(struct rcu_head
*rh
, const struct rcu_action
*action
)
496 /* refer to rcu_bump() for why we need to hold RCU when adding items
499 rcu_assert_read_locked();
507 rcu_heads_add_tail(&rcu_heads
, rh
);
508 atomic_store_explicit(&rcu_dirty
, seqlock_cur(&rcu_seq
),
509 memory_order_relaxed
);
512 void rcu_close(struct rcu_head_close
*rhc
, int fd
)
515 rcu_enqueue(&rhc
->rcu_head
, &rcua_close
);