]> git.proxmox.com Git - mirror_qemu.git/blobdiff - util/rcu.c
rcu: Introduce force_rcu notifier
[mirror_qemu.git] / util / rcu.c
index c9c3e6e4abe9153c8d8bff15f5ec287cea9d43ee..c91da9f137c8523e5dbb4d2ce1da7dfb55a1fb7e 100644 (file)
  * IBM's contributions to this file may be relicensed under LGPLv2 or later.
  */
 
-#include "qemu-common.h"
-#include <stdio.h>
-#include <assert.h>
-#include <stdlib.h>
-#include <stdint.h>
-#include <errno.h>
+#include "qemu/osdep.h"
 #include "qemu/rcu.h"
 #include "qemu/atomic.h"
 #include "qemu/thread.h"
+#include "qemu/main-loop.h"
+#include "qemu/lockable.h"
+#if defined(CONFIG_MALLOC_TRIM)
+#include <malloc.h>
+#endif
 
 /*
  * Global grace period counter.  Bit 0 is always one in rcu_gp_ctr.
@@ -46,7 +46,9 @@
 unsigned long rcu_gp_ctr = RCU_GP_LOCKED;
 
 QemuEvent rcu_gp_event;
-static QemuMutex rcu_gp_lock;
+static int in_drain_call_rcu;
+static QemuMutex rcu_registry_lock;
+static QemuMutex rcu_sync_lock;
 
 /*
  * Check whether a quiescent state was crossed between the beginning of
@@ -56,7 +58,7 @@ static inline int rcu_gp_ongoing(unsigned long *ctr)
 {
     unsigned long v;
 
-    v = atomic_read(ctr);
+    v = qatomic_read(ctr);
     return v && (v != rcu_gp_ctr);
 }
 
@@ -65,7 +67,7 @@ static inline int rcu_gp_ongoing(unsigned long *ctr)
  */
 __thread struct rcu_reader_data rcu_reader;
 
-/* Protected by rcu_gp_lock.  */
+/* Protected by rcu_registry_lock.  */
 typedef QLIST_HEAD(, rcu_reader_data) ThreadList;
 static ThreadList registry = QLIST_HEAD_INITIALIZER(registry);
 
@@ -81,18 +83,21 @@ static void wait_for_readers(void)
          */
         qemu_event_reset(&rcu_gp_event);
 
-        /* Instead of using atomic_mb_set for index->waiting, and
-         * atomic_mb_read for index->ctr, memory barriers are placed
+        /* Instead of using qatomic_mb_set for index->waiting, and
+         * qatomic_mb_read for index->ctr, memory barriers are placed
          * manually since writes to different threads are independent.
-         * atomic_mb_set has a smp_wmb before...
+         * qemu_event_reset has acquire semantics, so no memory barrier
+         * is needed here.
          */
-        smp_wmb();
         QLIST_FOREACH(index, &registry, node) {
-            atomic_set(&index->waiting, true);
+            qatomic_set(&index->waiting, true);
         }
 
-        /* ... and a smp_mb after.  */
-        smp_mb();
+        /* Here, order the stores to index->waiting before the loads of
+         * index->ctr.  Pairs with smp_mb_placeholder() in rcu_read_unlock(),
+         * ensuring that the loads of index->ctr are sequentially consistent.
+         */
+        smp_mb_global();
 
         QLIST_FOREACH_SAFE(index, &registry, node, tmp) {
             if (!rcu_gp_ongoing(&index->ctr)) {
@@ -102,21 +107,36 @@ static void wait_for_readers(void)
                 /* No need for mb_set here, worst of all we
                  * get some extra futex wakeups.
                  */
-                atomic_set(&index->waiting, false);
+                qatomic_set(&index->waiting, false);
+            } else if (qatomic_read(&in_drain_call_rcu)) {
+                notifier_list_notify(&index->force_rcu, NULL);
             }
         }
 
-        /* atomic_mb_read has smp_rmb after.  */
-        smp_rmb();
-
         if (QLIST_EMPTY(&registry)) {
             break;
         }
 
-        /* Wait for one thread to report a quiescent state and
-         * try again.
+        /* Wait for one thread to report a quiescent state and try again.
+         * Release rcu_registry_lock, so rcu_(un)register_thread() doesn't
+         * wait too much time.
+         *
+         * rcu_register_thread() may add nodes to &registry; it will not
+         * wake up synchronize_rcu, but that is okay because at least another
+         * thread must exit its RCU read-side critical section before
+         * synchronize_rcu is done.  The next iteration of the loop will
+         * move the new thread's rcu_reader from &registry to &qsreaders,
+         * because rcu_gp_ongoing() will return false.
+         *
+         * rcu_unregister_thread() may remove nodes from &qsreaders instead
+         * of &registry if it runs during qemu_event_wait.  That's okay;
+         * the node then will not be added back to &registry by QLIST_SWAP
+         * below.  The invariant is that the node is part of one list when
+         * rcu_registry_lock is released.
          */
+        qemu_mutex_unlock(&rcu_registry_lock);
         qemu_event_wait(&rcu_gp_event);
+        qemu_mutex_lock(&rcu_registry_lock);
     }
 
     /* put back the reader list in the registry */
@@ -125,10 +145,16 @@ static void wait_for_readers(void)
 
 void synchronize_rcu(void)
 {
-    qemu_mutex_lock(&rcu_gp_lock);
+    QEMU_LOCK_GUARD(&rcu_sync_lock);
 
+    /* Write RCU-protected pointers before reading p_rcu_reader->ctr.
+     * Pairs with smp_mb_placeholder() in rcu_read_lock().
+     */
+    smp_mb_global();
+
+    QEMU_LOCK_GUARD(&rcu_registry_lock);
     if (!QLIST_EMPTY(&registry)) {
-        /* In either case, the atomic_mb_set below blocks stores that free
+        /* In either case, the qatomic_mb_set below blocks stores that free
          * old RCU-protected pointers.
          */
         if (sizeof(rcu_gp_ctr) < 8) {
@@ -137,18 +163,16 @@ void synchronize_rcu(void)
              *
              * Switch parity: 0 -> 1, 1 -> 0.
              */
-            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
+            qatomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
             wait_for_readers();
-            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
+            qatomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
         } else {
             /* Increment current grace period.  */
-            atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
+            qatomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
         }
 
         wait_for_readers();
     }
-
-    qemu_mutex_unlock(&rcu_gp_lock);
 }
 
 
@@ -167,8 +191,8 @@ static void enqueue(struct rcu_head *node)
     struct rcu_head **old_tail;
 
     node->next = NULL;
-    old_tail = atomic_xchg(&tail, &node->next);
-    atomic_mb_set(old_tail, node);
+    old_tail = qatomic_xchg(&tail, &node->next);
+    qatomic_mb_set(old_tail, node);
 }
 
 static struct rcu_head *try_dequeue(void)
@@ -182,7 +206,7 @@ retry:
      * The tail, because it is the first step in the enqueuing.
      * It is only the next pointers that might be inconsistent.
      */
-    if (head == &dummy && atomic_mb_read(&tail) == &dummy.next) {
+    if (head == &dummy && qatomic_mb_read(&tail) == &dummy.next) {
         abort();
     }
 
@@ -190,7 +214,7 @@ retry:
      * wrong and we need to wait until its enqueuer finishes the update.
      */
     node = head;
-    next = atomic_mb_read(&head->next);
+    next = qatomic_mb_read(&head->next);
     if (!next) {
         return NULL;
     }
@@ -215,40 +239,51 @@ static void *call_rcu_thread(void *opaque)
 {
     struct rcu_head *node;
 
+    rcu_register_thread();
+
     for (;;) {
         int tries = 0;
-        int n = atomic_read(&rcu_call_count);
+        int n = qatomic_read(&rcu_call_count);
 
         /* Heuristically wait for a decent number of callbacks to pile up.
          * Fetch rcu_call_count now, we only must process elements that were
          * added before synchronize_rcu() starts.
          */
-        while (n < RCU_CALL_MIN_SIZE && ++tries <= 5) {
-            g_usleep(100000);
-            qemu_event_reset(&rcu_call_ready_event);
-            n = atomic_read(&rcu_call_count);
-            if (n < RCU_CALL_MIN_SIZE) {
-                qemu_event_wait(&rcu_call_ready_event);
-                n = atomic_read(&rcu_call_count);
+        while (n == 0 || (n < RCU_CALL_MIN_SIZE && ++tries <= 5)) {
+            g_usleep(10000);
+            if (n == 0) {
+                qemu_event_reset(&rcu_call_ready_event);
+                n = qatomic_read(&rcu_call_count);
+                if (n == 0) {
+#if defined(CONFIG_MALLOC_TRIM)
+                    malloc_trim(4 * 1024 * 1024);
+#endif
+                    qemu_event_wait(&rcu_call_ready_event);
+                }
             }
+            n = qatomic_read(&rcu_call_count);
         }
 
-        atomic_sub(&rcu_call_count, n);
+        qatomic_sub(&rcu_call_count, n);
         synchronize_rcu();
+        qemu_mutex_lock_iothread();
         while (n > 0) {
             node = try_dequeue();
             while (!node) {
+                qemu_mutex_unlock_iothread();
                 qemu_event_reset(&rcu_call_ready_event);
                 node = try_dequeue();
                 if (!node) {
                     qemu_event_wait(&rcu_call_ready_event);
                     node = try_dequeue();
                 }
+                qemu_mutex_lock_iothread();
             }
 
             n--;
             node->func(node);
         }
+        qemu_mutex_unlock_iothread();
     }
     abort();
 }
@@ -257,35 +292,164 @@ void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node))
 {
     node->func = func;
     enqueue(node);
-    atomic_inc(&rcu_call_count);
+    qatomic_inc(&rcu_call_count);
     qemu_event_set(&rcu_call_ready_event);
 }
 
+
+struct rcu_drain {
+    struct rcu_head rcu;
+    QemuEvent drain_complete_event;
+};
+
+static void drain_rcu_callback(struct rcu_head *node)
+{
+    struct rcu_drain *event = (struct rcu_drain *)node;
+    qemu_event_set(&event->drain_complete_event);
+}
+
+/*
+ * This function ensures that all pending RCU callbacks
+ * on the current thread are done executing
+
+ * drops big qemu lock during the wait to allow RCU thread
+ * to process the callbacks
+ *
+ */
+
+void drain_call_rcu(void)
+{
+    struct rcu_drain rcu_drain;
+    bool locked = qemu_mutex_iothread_locked();
+
+    memset(&rcu_drain, 0, sizeof(struct rcu_drain));
+    qemu_event_init(&rcu_drain.drain_complete_event, false);
+
+    if (locked) {
+        qemu_mutex_unlock_iothread();
+    }
+
+
+    /*
+     * RCU callbacks are invoked in the same order as in which they
+     * are registered, thus we can be sure that when 'drain_rcu_callback'
+     * is called, all RCU callbacks that were registered on this thread
+     * prior to calling this function are completed.
+     *
+     * Note that since we have only one global queue of the RCU callbacks,
+     * we also end up waiting for most of RCU callbacks that were registered
+     * on the other threads, but this is a side effect that shoudn't be
+     * assumed.
+     */
+
+    qatomic_inc(&in_drain_call_rcu);
+    call_rcu1(&rcu_drain.rcu, drain_rcu_callback);
+    qemu_event_wait(&rcu_drain.drain_complete_event);
+    qatomic_dec(&in_drain_call_rcu);
+
+    if (locked) {
+        qemu_mutex_lock_iothread();
+    }
+
+}
+
 void rcu_register_thread(void)
 {
     assert(rcu_reader.ctr == 0);
-    qemu_mutex_lock(&rcu_gp_lock);
+    qemu_mutex_lock(&rcu_registry_lock);
     QLIST_INSERT_HEAD(&registry, &rcu_reader, node);
-    qemu_mutex_unlock(&rcu_gp_lock);
+    qemu_mutex_unlock(&rcu_registry_lock);
 }
 
 void rcu_unregister_thread(void)
 {
-    qemu_mutex_lock(&rcu_gp_lock);
+    qemu_mutex_lock(&rcu_registry_lock);
     QLIST_REMOVE(&rcu_reader, node);
-    qemu_mutex_unlock(&rcu_gp_lock);
+    qemu_mutex_unlock(&rcu_registry_lock);
 }
 
-static void __attribute__((__constructor__)) rcu_init(void)
+void rcu_add_force_rcu_notifier(Notifier *n)
+{
+    qemu_mutex_lock(&rcu_registry_lock);
+    notifier_list_add(&rcu_reader.force_rcu, n);
+    qemu_mutex_unlock(&rcu_registry_lock);
+}
+
+void rcu_remove_force_rcu_notifier(Notifier *n)
+{
+    qemu_mutex_lock(&rcu_registry_lock);
+    notifier_remove(n);
+    qemu_mutex_unlock(&rcu_registry_lock);
+}
+
+static void rcu_init_complete(void)
 {
     QemuThread thread;
 
-    qemu_mutex_init(&rcu_gp_lock);
+    qemu_mutex_init(&rcu_registry_lock);
+    qemu_mutex_init(&rcu_sync_lock);
     qemu_event_init(&rcu_gp_event, true);
 
     qemu_event_init(&rcu_call_ready_event, false);
+
+    /* The caller is assumed to have iothread lock, so the call_rcu thread
+     * must have been quiescent even after forking, just recreate it.
+     */
     qemu_thread_create(&thread, "call_rcu", call_rcu_thread,
                        NULL, QEMU_THREAD_DETACHED);
 
     rcu_register_thread();
 }
+
+static int atfork_depth = 1;
+
+void rcu_enable_atfork(void)
+{
+    atfork_depth++;
+}
+
+void rcu_disable_atfork(void)
+{
+    atfork_depth--;
+}
+
+#ifdef CONFIG_POSIX
+static void rcu_init_lock(void)
+{
+    if (atfork_depth < 1) {
+        return;
+    }
+
+    qemu_mutex_lock(&rcu_sync_lock);
+    qemu_mutex_lock(&rcu_registry_lock);
+}
+
+static void rcu_init_unlock(void)
+{
+    if (atfork_depth < 1) {
+        return;
+    }
+
+    qemu_mutex_unlock(&rcu_registry_lock);
+    qemu_mutex_unlock(&rcu_sync_lock);
+}
+
+static void rcu_init_child(void)
+{
+    if (atfork_depth < 1) {
+        return;
+    }
+
+    memset(&registry, 0, sizeof(registry));
+    rcu_init_complete();
+}
+#endif
+
+static void __attribute__((__constructor__)) rcu_init(void)
+{
+    smp_mb_global_init();
+#ifdef CONFIG_POSIX
+    pthread_atfork(rcu_init_lock, rcu_init_unlock, rcu_init_child);
+#endif
+    rcu_init_complete();
+}