2 * Copyright (c) 2017-19 David Lamparter, for NetDEF, Inc.
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 /* implementation notes: this is an epoch-based RCU implementation. rcu_seq
18 * (global variable) counts the current epoch. Threads hold a specific epoch
19 * in rcu_read_lock(). This is the oldest epoch a thread might be accessing
22 * The rcu_seq global is only pushed forward on rcu_read_lock() and
23 * rcu_read_unlock() calls. This makes things a tad more efficient since
24 * those are the only places it matters:
25 * - on rcu_read_lock, we don't want to hold an old epoch pointlessly
26 * - on rcu_read_unlock, we want to make sure we're not stuck on an old epoch
27 * when heading into a long idle period where no thread holds RCU
29 * rcu_thread structures themselves are RCU-free'd.
31 * rcu_head structures are the most iffy; normally for an ATOMLIST we would
32 * need to make sure we use rcu_free or pthread_rwlock to deallocate old items
33 * to prevent ABA or use-after-free problems. However, our ATOMLIST code
34 * guarantees that if the list remains non-empty in all cases, we only need
35 * the "last" pointer to do an "add_tail()", i.e. we can't run into ABA/UAF
36 * issues - but we do need to keep at least 1 item on the list.
38 * (Search the atomlist code for all uses of "last")
46 #ifdef HAVE_PTHREAD_NP_H
47 #include <pthread_np.h>
57 DEFINE_MTYPE_STATIC(LIB
, RCU_THREAD
, "RCU thread");
58 DEFINE_MTYPE_STATIC(LIB
, RCU_NEXT
, "RCU sequence barrier");
60 DECLARE_ATOMLIST(rcu_heads
, struct rcu_head
, head
);
62 PREDECL_ATOMLIST(rcu_threads
);
64 struct rcu_threads_item head
;
66 struct rcu_head rcu_head
;
70 /* only accessed by thread itself, not atomic */
73 DECLARE_ATOMLIST(rcu_threads
, struct rcu_thread
, head
);
75 static const struct rcu_action rcua_next
= { .type
= RCUA_NEXT
};
76 static const struct rcu_action rcua_end
= { .type
= RCUA_END
};
77 static const struct rcu_action rcua_close
= { .type
= RCUA_CLOSE
};
80 struct rcu_head head_free
;
81 struct rcu_head head_next
;
84 #define rcu_free_internal(mtype, ptr, field) \
86 typeof(ptr) _ptr = (ptr); \
87 struct rcu_head *_rcu_head = &_ptr->field; \
88 static const struct rcu_action _rcu_action = { \
92 .offset = offsetof(typeof(*_ptr), field), \
95 _rcu_head->action = &_rcu_action; \
96 rcu_heads_add_tail(&rcu_heads, _rcu_head); \
99 /* primary global RCU position */
100 static struct seqlock rcu_seq
;
101 /* this is set to rcu_seq whenever something is added on the RCU queue.
102 * rcu_read_lock() and rcu_read_unlock() will then bump rcu_seq up one step.
104 static _Atomic seqlock_val_t rcu_dirty
;
106 static struct rcu_threads_head rcu_threads
;
107 static struct rcu_heads_head rcu_heads
;
109 /* main thread & RCU sweeper have pre-setup rcu_thread structures. The
110 * reasons are different:
112 * - rcu_thread_main is there because the main thread isn't started like
113 * other threads, it's implicitly created when the program is started. So
114 * rcu_thread_main matches up implicitly.
116 * - rcu_thread_rcu isn't actually put on the rcu_threads list (makes no
117 * sense really), it only exists so we can call RCU-using functions from
118 * the RCU thread without special handling in rcu_read_lock/unlock.
120 static struct rcu_thread rcu_thread_main
;
121 static struct rcu_thread rcu_thread_rcu
;
123 static pthread_t rcu_pthread
;
124 static pthread_key_t rcu_thread_key
;
125 static bool rcu_active
;
127 static void rcu_start(void);
128 static void rcu_bump(void);
131 * preinitialization for main thread
133 static void rcu_thread_end(void *rcu_thread
);
135 static void rcu_preinit(void) __attribute__((constructor
));
136 static void rcu_preinit(void)
138 struct rcu_thread
*rt
;
140 rt
= &rcu_thread_main
;
142 seqlock_init(&rt
->rcu
);
143 seqlock_acquire_val(&rt
->rcu
, SEQLOCK_STARTVAL
);
145 pthread_key_create(&rcu_thread_key
, rcu_thread_end
);
146 pthread_setspecific(rcu_thread_key
, rt
);
148 rcu_threads_add_tail(&rcu_threads
, rt
);
150 /* RCU sweeper's rcu_thread is a dummy, NOT added to rcu_threads */
151 rt
= &rcu_thread_rcu
;
154 seqlock_init(&rcu_seq
);
155 seqlock_acquire_val(&rcu_seq
, SEQLOCK_STARTVAL
);
158 static struct rcu_thread
*rcu_self(void)
160 return (struct rcu_thread
*)pthread_getspecific(rcu_thread_key
);
164 * thread management (for the non-main thread)
166 struct rcu_thread
*rcu_thread_prepare(void)
168 struct rcu_thread
*rt
, *cur
;
170 rcu_assert_read_locked();
178 /* new thread always starts with rcu_read_lock held at depth 1, and
179 * holding the same epoch as the parent (this makes it possible to
180 * use RCU for things passed into the thread through its arg)
182 rt
= XCALLOC(MTYPE_RCU_THREAD
, sizeof(*rt
));
185 seqlock_init(&rt
->rcu
);
186 seqlock_acquire(&rt
->rcu
, &cur
->rcu
);
188 rcu_threads_add_tail(&rcu_threads
, rt
);
193 void rcu_thread_start(struct rcu_thread
*rt
)
195 pthread_setspecific(rcu_thread_key
, rt
);
198 void rcu_thread_unprepare(struct rcu_thread
*rt
)
200 if (rt
== &rcu_thread_rcu
)
204 seqlock_acquire(&rt
->rcu
, &rcu_seq
);
207 if (rt
!= &rcu_thread_main
)
208 /* this free() happens after seqlock_release() below */
209 rcu_free_internal(MTYPE_RCU_THREAD
, rt
, rcu_head
);
211 rcu_threads_del(&rcu_threads
, rt
);
212 seqlock_release(&rt
->rcu
);
215 static void rcu_thread_end(void *rtvoid
)
217 struct rcu_thread
*rt
= rtvoid
;
218 rcu_thread_unprepare(rt
);
222 * main RCU control aspects
225 static void rcu_bump(void)
229 rn
= XMALLOC(MTYPE_RCU_NEXT
, sizeof(*rn
));
231 /* note: each RCUA_NEXT item corresponds to exactly one seqno bump.
232 * This means we don't need to communicate which seqno is which
233 * RCUA_NEXT, since we really don't care.
237 * Important race condition: while rcu_heads_add_tail is executing,
238 * there is an intermediate point where the rcu_heads "last" pointer
239 * already points to rn->head_next, but rn->head_next isn't added to
240 * the list yet. That means any other "add_tail" calls append to this
241 * item, which isn't fully on the list yet. Freeze this thread at
242 * that point and look at another thread doing a rcu_bump. It adds
243 * these two items and then does a seqlock_bump. But the rcu_heads
244 * list is still "interrupted" and there's no RCUA_NEXT on the list
245 * yet (from either the frozen thread or the second thread). So
246 * rcu_main() might actually hit the end of the list at the
249 * This situation is prevented by requiring that rcu_read_lock is held
250 * for any calls to rcu_bump, since if we're holding the current RCU
251 * epoch, that means rcu_main can't be chewing on rcu_heads and hit
252 * that interruption point. Only by the time the thread has continued
253 * to rcu_read_unlock() - and therefore completed the add_tail - the
254 * RCU sweeper gobbles up the epoch and can be sure to find at least
255 * the RCUA_NEXT and RCUA_FREE items on rcu_heads.
257 rn
->head_next
.action
= &rcua_next
;
258 rcu_heads_add_tail(&rcu_heads
, &rn
->head_next
);
260 /* free rn that we allocated above.
262 * This is INTENTIONALLY not built into the RCUA_NEXT action. This
263 * ensures that after the action above is popped off the queue, there
264 * is still at least 1 item on the RCU queue. This means we never
265 * delete the last item, which is extremely important since it keeps
266 * the atomlist ->last pointer alive and well.
268 * If we were to "run dry" on the RCU queue, add_tail may run into the
269 * "last item is being deleted - start over" case, and then we may end
270 * up accessing old RCU queue items that are already free'd.
272 rcu_free_internal(MTYPE_RCU_NEXT
, rn
, head_free
);
274 /* Only allow the RCU sweeper to run after these 2 items are queued.
276 * If another thread enqueues some RCU action in the intermediate
277 * window here, nothing bad happens - the queued action is associated
278 * with a larger seq# than strictly necessary. Thus, it might get
279 * executed a bit later, but that's not a problem.
281 * If another thread acquires the read lock in this window, it holds
282 * the previous epoch, but its RCU queue actions will be in the next
283 * epoch. This isn't a problem either, just a tad inefficient.
285 seqlock_bump(&rcu_seq
);
288 static void rcu_bump_maybe(void)
292 dirty
= atomic_load_explicit(&rcu_dirty
, memory_order_relaxed
);
293 /* no problem if we race here and multiple threads bump rcu_seq;
294 * bumping too much causes no issues while not bumping enough will
295 * result in delayed cleanup
297 if (dirty
== seqlock_cur(&rcu_seq
))
301 void rcu_read_lock(void)
303 struct rcu_thread
*rt
= rcu_self();
309 seqlock_acquire(&rt
->rcu
, &rcu_seq
);
310 /* need to hold RCU for bump ... */
312 /* ... but no point in holding the old epoch if we just bumped */
313 seqlock_acquire(&rt
->rcu
, &rcu_seq
);
316 void rcu_read_unlock(void)
318 struct rcu_thread
*rt
= rcu_self();
320 assert(rt
&& rt
->depth
);
324 seqlock_release(&rt
->rcu
);
327 void rcu_assert_read_locked(void)
329 struct rcu_thread
*rt
= rcu_self();
330 assert(rt
&& rt
->depth
&& seqlock_held(&rt
->rcu
));
333 void rcu_assert_read_unlocked(void)
335 struct rcu_thread
*rt
= rcu_self();
336 assert(rt
&& !rt
->depth
&& !seqlock_held(&rt
->rcu
));
340 * RCU resource-release thread
343 static void *rcu_main(void *arg
);
345 static void rcu_start(void)
347 /* ensure we never handle signals on the RCU thread by blocking
348 * everything here (new thread inherits signal mask)
350 sigset_t oldsigs
, blocksigs
;
352 sigfillset(&blocksigs
);
353 pthread_sigmask(SIG_BLOCK
, &blocksigs
, &oldsigs
);
357 assert(!pthread_create(&rcu_pthread
, NULL
, rcu_main
, NULL
));
359 pthread_sigmask(SIG_SETMASK
, &oldsigs
, NULL
);
361 #ifdef HAVE_PTHREAD_SETNAME_NP
363 pthread_setname_np(rcu_pthread
, "RCU sweeper");
364 # elif defined(__NetBSD__)
365 pthread_setname_np(rcu_pthread
, "RCU sweeper", NULL
);
367 #elif defined(HAVE_PTHREAD_SET_NAME_NP)
368 pthread_set_name_np(rcu_pthread
, "RCU sweeper");
372 static void rcu_do(struct rcu_head
*rh
)
374 struct rcu_head_close
*rhc
;
377 switch (rh
->action
->type
) {
379 p
= (char *)rh
- rh
->action
->u
.free
.offset
;
380 if (rh
->action
->u
.free
.mt
)
381 qfree(rh
->action
->u
.free
.mt
, p
);
386 rhc
= container_of(rh
, struct rcu_head_close
,
391 p
= (char *)rh
- rh
->action
->u
.call
.offset
;
392 rh
->action
->u
.call
.fptr(p
);
403 static void rcu_watchdog(struct rcu_thread
*rt
)
406 /* future work: print a backtrace for the thread that's holding up
407 * RCU. The only (good) way of doing that is to send a signal to the
408 * other thread, save away the backtrace in the signal handler, and
409 * block here until the signal is done processing.
411 * Just haven't implemented that yet.
413 fprintf(stderr
, "RCU watchdog %p\n", rt
);
417 static void *rcu_main(void *arg
)
419 struct rcu_thread
*rt
;
420 struct rcu_head
*rh
= NULL
;
422 struct timespec maxwait
;
424 seqlock_val_t rcuval
= SEQLOCK_STARTVAL
;
426 pthread_setspecific(rcu_thread_key
, &rcu_thread_rcu
);
429 seqlock_wait(&rcu_seq
, rcuval
);
431 /* RCU watchdog timeout, TODO: configurable value */
432 clock_gettime(CLOCK_MONOTONIC
, &maxwait
);
433 maxwait
.tv_nsec
+= 100 * 1000 * 1000;
434 if (maxwait
.tv_nsec
>= 1000000000) {
436 maxwait
.tv_nsec
-= 1000000000;
439 frr_each (rcu_threads
, &rcu_threads
, rt
)
440 if (!seqlock_timedwait(&rt
->rcu
, rcuval
, &maxwait
)) {
442 seqlock_wait(&rt
->rcu
, rcuval
);
445 while ((rh
= rcu_heads_pop(&rcu_heads
))) {
446 if (rh
->action
->type
== RCUA_NEXT
)
448 else if (rh
->action
->type
== RCUA_END
)
454 rcuval
+= SEQLOCK_INCR
;
457 /* rcu_shutdown can only be called singlethreaded, and it does a
458 * pthread_join, so it should be impossible that anything ended up
459 * on the queue after RCUA_END
462 assert(!rcu_heads_first(&rcu_heads
));
464 while ((rh
= rcu_heads_pop(&rcu_heads
)))
465 if (rh
->action
->type
>= RCUA_FREE
)
471 void rcu_shutdown(void)
473 static struct rcu_head rcu_head_end
;
474 struct rcu_thread
*rt
= rcu_self();
480 rcu_assert_read_locked();
481 assert(rcu_threads_count(&rcu_threads
) == 1);
483 rcu_enqueue(&rcu_head_end
, &rcua_end
);
486 seqlock_release(&rt
->rcu
);
487 seqlock_release(&rcu_seq
);
490 /* clearing rcu_active is before pthread_join in case we hang in
491 * pthread_join & get a SIGTERM or something - in that case, just
492 * ignore the maybe-still-running RCU thread
494 if (pthread_join(rcu_pthread
, &retval
) == 0) {
495 seqlock_acquire_val(&rcu_seq
, SEQLOCK_STARTVAL
);
496 seqlock_acquire_val(&rt
->rcu
, SEQLOCK_STARTVAL
);
502 * RCU'd free functions
505 void rcu_enqueue(struct rcu_head
*rh
, const struct rcu_action
*action
)
507 /* refer to rcu_bump() for why we need to hold RCU when adding items
510 rcu_assert_read_locked();
518 rcu_heads_add_tail(&rcu_heads
, rh
);
519 atomic_store_explicit(&rcu_dirty
, seqlock_cur(&rcu_seq
),
520 memory_order_relaxed
);
523 void rcu_close(struct rcu_head_close
*rhc
, int fd
)
526 rcu_enqueue(&rhc
->rcu_head
, &rcua_close
);