2 * Copyright (c) 2017-19 David Lamparter, for NetDEF, Inc.
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 /* implementation notes: this is an epoch-based RCU implementation. rcu_seq
18 * (global variable) counts the current epoch. Threads hold a specific epoch
19 * in rcu_read_lock(). This is the oldest epoch a thread might be accessing
22 * The rcu_seq global is only pushed forward on rcu_read_lock() and
23 * rcu_read_unlock() calls. This makes things a tad more efficient since
24 * those are the only places it matters:
25 * - on rcu_read_lock, we don't want to hold an old epoch pointlessly
26 * - on rcu_read_unlock, we want to make sure we're not stuck on an old epoch
27 * when heading into a long idle period where no thread holds RCU
29 * rcu_thread structures themselves are RCU-free'd.
31 * rcu_head structures are the most iffy; normally for an ATOMLIST we would
32 * need to make sure we use rcu_free or pthread_rwlock to deallocate old items
33 * to prevent ABA or use-after-free problems. However, our ATOMLIST code
34 * guarantees that if the list remains non-empty in all cases, we only need
35 * the "last" pointer to do an "add_tail()", i.e. we can't run into ABA/UAF
36 * issues - but we do need to keep at least 1 item on the list.
38 * (Search the atomlist code for all uses of "last")
46 #ifdef HAVE_PTHREAD_NP_H
47 #include <pthread_np.h>
57 DEFINE_MTYPE_STATIC(LIB
, RCU_THREAD
, "RCU thread")
59 DECLARE_ATOMLIST(rcu_heads
, struct rcu_head
, head
)
61 PREDECL_ATOMLIST(rcu_threads
)
63 struct rcu_threads_item head
;
65 struct rcu_head rcu_head
;
69 /* only accessed by thread itself, not atomic */
72 DECLARE_ATOMLIST(rcu_threads
, struct rcu_thread
, head
)
74 static const struct rcu_action rcua_next
= { .type
= RCUA_NEXT
};
75 static const struct rcu_action rcua_end
= { .type
= RCUA_END
};
76 static const struct rcu_action rcua_close
= { .type
= RCUA_CLOSE
};
79 struct rcu_head head_free
;
80 struct rcu_head head_next
;
83 #define rcu_free_internal(mtype, ptr, field) \
85 typeof(ptr) _ptr = (ptr); \
86 struct rcu_head *_rcu_head = &_ptr->field; \
87 static const struct rcu_action _rcu_action = { \
91 .offset = offsetof(typeof(*_ptr), field), \
94 _rcu_head->action = &_rcu_action; \
95 rcu_heads_add_tail(&rcu_heads, _rcu_head); \
98 /* primary global RCU position */
99 static struct seqlock rcu_seq
;
100 /* this is set to rcu_seq whenever something is added on the RCU queue.
101 * rcu_read_lock() and rcu_read_unlock() will then bump rcu_seq up one step.
103 static _Atomic seqlock_val_t rcu_dirty
;
105 static struct rcu_threads_head rcu_threads
;
106 static struct rcu_heads_head rcu_heads
;
108 /* main thread & RCU sweeper have pre-setup rcu_thread structures. The
109 * reasons are different:
111 * - rcu_thread_main is there because the main thread isn't started like
112 * other threads, it's implicitly created when the program is started. So
113 * rcu_thread_main matches up implicitly.
115 * - rcu_thread_rcu isn't actually put on the rcu_threads list (makes no
116 * sense really), it only exists so we can call RCU-using functions from
117 * the RCU thread without special handling in rcu_read_lock/unlock.
119 static struct rcu_thread rcu_thread_main
;
120 static struct rcu_thread rcu_thread_rcu
;
122 static pthread_t rcu_pthread
;
123 static pthread_key_t rcu_thread_key
;
124 static bool rcu_active
;
126 static void rcu_start(void);
127 static void rcu_bump(void);
130 * preinitialization for main thread
132 static void rcu_thread_end(void *rcu_thread
);
134 static void rcu_preinit(void) __attribute__((constructor
));
135 static void rcu_preinit(void)
137 struct rcu_thread
*rt
;
139 rt
= &rcu_thread_main
;
141 seqlock_init(&rt
->rcu
);
142 seqlock_acquire_val(&rt
->rcu
, SEQLOCK_STARTVAL
);
144 pthread_key_create(&rcu_thread_key
, rcu_thread_end
);
145 pthread_setspecific(rcu_thread_key
, rt
);
147 rcu_threads_add_tail(&rcu_threads
, rt
);
149 /* RCU sweeper's rcu_thread is a dummy, NOT added to rcu_threads */
150 rt
= &rcu_thread_rcu
;
153 seqlock_init(&rcu_seq
);
154 seqlock_acquire_val(&rcu_seq
, SEQLOCK_STARTVAL
);
157 static struct rcu_thread
*rcu_self(void)
159 return (struct rcu_thread
*)pthread_getspecific(rcu_thread_key
);
163 * thread management (for the non-main thread)
165 struct rcu_thread
*rcu_thread_prepare(void)
167 struct rcu_thread
*rt
, *cur
;
169 rcu_assert_read_locked();
177 /* new thread always starts with rcu_read_lock held at depth 1, and
178 * holding the same epoch as the parent (this makes it possible to
179 * use RCU for things passed into the thread through its arg)
181 rt
= XCALLOC(MTYPE_RCU_THREAD
, sizeof(*rt
));
184 seqlock_init(&rt
->rcu
);
185 seqlock_acquire(&rt
->rcu
, &cur
->rcu
);
187 rcu_threads_add_tail(&rcu_threads
, rt
);
192 void rcu_thread_start(struct rcu_thread
*rt
)
194 pthread_setspecific(rcu_thread_key
, rt
);
197 void rcu_thread_unprepare(struct rcu_thread
*rt
)
199 if (rt
== &rcu_thread_rcu
)
203 seqlock_acquire(&rt
->rcu
, &rcu_seq
);
206 if (rt
!= &rcu_thread_main
)
207 /* this free() happens after seqlock_release() below */
208 rcu_free_internal(MTYPE_RCU_THREAD
, rt
, rcu_head
);
210 rcu_threads_del(&rcu_threads
, rt
);
211 seqlock_release(&rt
->rcu
);
214 static void rcu_thread_end(void *rtvoid
)
216 struct rcu_thread
*rt
= rtvoid
;
217 rcu_thread_unprepare(rt
);
221 * main RCU control aspects
224 static void rcu_bump(void)
228 rn
= XMALLOC(MTYPE_RCU_THREAD
, sizeof(*rn
));
230 /* note: each RCUA_NEXT item corresponds to exactly one seqno bump.
231 * This means we don't need to communicate which seqno is which
232 * RCUA_NEXT, since we really don't care.
236 * Important race condition: while rcu_heads_add_tail is executing,
237 * there is an intermediate point where the rcu_heads "last" pointer
238 * already points to rn->head_next, but rn->head_next isn't added to
239 * the list yet. That means any other "add_tail" calls append to this
240 * item, which isn't fully on the list yet. Freeze this thread at
241 * that point and look at another thread doing a rcu_bump. It adds
242 * these two items and then does a seqlock_bump. But the rcu_heads
243 * list is still "interrupted" and there's no RCUA_NEXT on the list
244 * yet (from either the frozen thread or the second thread). So
245 * rcu_main() might actually hit the end of the list at the
248 * This situation is prevented by requiring that rcu_read_lock is held
249 * for any calls to rcu_bump, since if we're holding the current RCU
250 * epoch, that means rcu_main can't be chewing on rcu_heads and hit
251 * that interruption point. Only by the time the thread has continued
252 * to rcu_read_unlock() - and therefore completed the add_tail - the
253 * RCU sweeper gobbles up the epoch and can be sure to find at least
254 * the RCUA_NEXT and RCUA_FREE items on rcu_heads.
256 rn
->head_next
.action
= &rcua_next
;
257 rcu_heads_add_tail(&rcu_heads
, &rn
->head_next
);
259 /* free rn that we allocated above.
261 * This is INTENTIONALLY not built into the RCUA_NEXT action. This
262 * ensures that after the action above is popped off the queue, there
263 * is still at least 1 item on the RCU queue. This means we never
264 * delete the last item, which is extremely important since it keeps
265 * the atomlist ->last pointer alive and well.
267 * If we were to "run dry" on the RCU queue, add_tail may run into the
268 * "last item is being deleted - start over" case, and then we may end
269 * up accessing old RCU queue items that are already free'd.
271 rcu_free_internal(MTYPE_RCU_THREAD
, rn
, head_free
);
273 /* Only allow the RCU sweeper to run after these 2 items are queued.
275 * If another thread enqueues some RCU action in the intermediate
276 * window here, nothing bad happens - the queued action is associated
277 * with a larger seq# than strictly necessary. Thus, it might get
278 * executed a bit later, but that's not a problem.
280 * If another thread acquires the read lock in this window, it holds
281 * the previous epoch, but its RCU queue actions will be in the next
282 * epoch. This isn't a problem either, just a tad inefficient.
284 seqlock_bump(&rcu_seq
);
287 static void rcu_bump_maybe(void)
291 dirty
= atomic_load_explicit(&rcu_dirty
, memory_order_relaxed
);
292 /* no problem if we race here and multiple threads bump rcu_seq;
293 * bumping too much causes no issues while not bumping enough will
294 * result in delayed cleanup
296 if (dirty
== seqlock_cur(&rcu_seq
))
300 void rcu_read_lock(void)
302 struct rcu_thread
*rt
= rcu_self();
308 seqlock_acquire(&rt
->rcu
, &rcu_seq
);
309 /* need to hold RCU for bump ... */
311 /* ... but no point in holding the old epoch if we just bumped */
312 seqlock_acquire(&rt
->rcu
, &rcu_seq
);
315 void rcu_read_unlock(void)
317 struct rcu_thread
*rt
= rcu_self();
319 assert(rt
&& rt
->depth
);
323 seqlock_release(&rt
->rcu
);
326 void rcu_assert_read_locked(void)
328 struct rcu_thread
*rt
= rcu_self();
329 assert(rt
&& rt
->depth
&& seqlock_held(&rt
->rcu
));
332 void rcu_assert_read_unlocked(void)
334 struct rcu_thread
*rt
= rcu_self();
335 assert(rt
&& !rt
->depth
&& !seqlock_held(&rt
->rcu
));
339 * RCU resource-release thread
342 static void *rcu_main(void *arg
);
344 static void rcu_start(void)
346 /* ensure we never handle signals on the RCU thread by blocking
347 * everything here (new thread inherits signal mask)
349 sigset_t oldsigs
, blocksigs
;
351 sigfillset(&blocksigs
);
352 pthread_sigmask(SIG_BLOCK
, &blocksigs
, &oldsigs
);
356 assert(!pthread_create(&rcu_pthread
, NULL
, rcu_main
, NULL
));
358 pthread_sigmask(SIG_SETMASK
, &oldsigs
, NULL
);
360 #ifdef HAVE_PTHREAD_SETNAME_NP
362 pthread_setname_np(rcu_pthread
, "RCU sweeper");
363 # elif defined(__NetBSD__)
364 pthread_setname_np(rcu_pthread
, "RCU sweeper", NULL
);
366 #elif defined(HAVE_PTHREAD_SET_NAME_NP)
367 pthread_set_name_np(rcu_pthread
, "RCU sweeper");
371 static void rcu_do(struct rcu_head
*rh
)
373 struct rcu_head_close
*rhc
;
376 switch (rh
->action
->type
) {
378 p
= (char *)rh
- rh
->action
->u
.free
.offset
;
379 if (rh
->action
->u
.free
.mt
)
380 qfree(rh
->action
->u
.free
.mt
, p
);
385 rhc
= container_of(rh
, struct rcu_head_close
,
390 p
= (char *)rh
- rh
->action
->u
.call
.offset
;
391 rh
->action
->u
.call
.fptr(p
);
402 static void rcu_watchdog(struct rcu_thread
*rt
)
405 /* future work: print a backtrace for the thread that's holding up
406 * RCU. The only (good) way of doing that is to send a signal to the
407 * other thread, save away the backtrace in the signal handler, and
408 * block here until the signal is done processing.
410 * Just haven't implemented that yet.
412 fprintf(stderr
, "RCU watchdog %p\n", rt
);
416 static void *rcu_main(void *arg
)
418 struct rcu_thread
*rt
;
419 struct rcu_head
*rh
= NULL
;
421 struct timespec maxwait
;
423 seqlock_val_t rcuval
= SEQLOCK_STARTVAL
;
425 pthread_setspecific(rcu_thread_key
, &rcu_thread_rcu
);
428 seqlock_wait(&rcu_seq
, rcuval
);
430 /* RCU watchdog timeout, TODO: configurable value */
431 clock_gettime(CLOCK_MONOTONIC
, &maxwait
);
432 maxwait
.tv_nsec
+= 100 * 1000 * 1000;
433 if (maxwait
.tv_nsec
>= 1000000000) {
435 maxwait
.tv_nsec
-= 1000000000;
438 frr_each (rcu_threads
, &rcu_threads
, rt
)
439 if (!seqlock_timedwait(&rt
->rcu
, rcuval
, &maxwait
)) {
441 seqlock_wait(&rt
->rcu
, rcuval
);
444 while ((rh
= rcu_heads_pop(&rcu_heads
))) {
445 if (rh
->action
->type
== RCUA_NEXT
)
447 else if (rh
->action
->type
== RCUA_END
)
453 rcuval
+= SEQLOCK_INCR
;
456 /* rcu_shutdown can only be called singlethreaded, and it does a
457 * pthread_join, so it should be impossible that anything ended up
458 * on the queue after RCUA_END
461 assert(!rcu_heads_first(&rcu_heads
));
463 while ((rh
= rcu_heads_pop(&rcu_heads
)))
464 if (rh
->action
->type
>= RCUA_FREE
)
470 void rcu_shutdown(void)
472 static struct rcu_head rcu_head_end
;
473 struct rcu_thread
*rt
= rcu_self();
479 rcu_assert_read_locked();
480 assert(rcu_threads_count(&rcu_threads
) == 1);
482 rcu_enqueue(&rcu_head_end
, &rcua_end
);
485 seqlock_release(&rt
->rcu
);
486 seqlock_release(&rcu_seq
);
489 /* clearing rcu_active is before pthread_join in case we hang in
490 * pthread_join & get a SIGTERM or something - in that case, just
491 * ignore the maybe-still-running RCU thread
493 if (pthread_join(rcu_pthread
, &retval
) == 0) {
494 seqlock_acquire_val(&rcu_seq
, SEQLOCK_STARTVAL
);
495 seqlock_acquire_val(&rt
->rcu
, SEQLOCK_STARTVAL
);
501 * RCU'd free functions
504 void rcu_enqueue(struct rcu_head
*rh
, const struct rcu_action
*action
)
506 /* refer to rcu_bump() for why we need to hold RCU when adding items
509 rcu_assert_read_locked();
517 rcu_heads_add_tail(&rcu_heads
, rh
);
518 atomic_store_explicit(&rcu_dirty
, seqlock_cur(&rcu_seq
),
519 memory_order_relaxed
);
522 void rcu_close(struct rcu_head_close
*rhc
, int fd
)
525 rcu_enqueue(&rhc
->rcu_head
, &rcua_close
);