]> git.proxmox.com Git - mirror_qemu.git/blobdiff - util/rcu.c
util/defer-call: move defer_call() to util/
[mirror_qemu.git] / util / rcu.c
index 13ac0f75cb2a7de3b61f13174fb545b8fd08e314..e587bcc48314b81a5a79bc5c399e88a3cefec245 100644 (file)
@@ -46,6 +46,7 @@
 unsigned long rcu_gp_ctr = RCU_GP_LOCKED;
 
 QemuEvent rcu_gp_event;
+static int in_drain_call_rcu;
 static QemuMutex rcu_registry_lock;
 static QemuMutex rcu_sync_lock;
 
@@ -64,7 +65,7 @@ static inline int rcu_gp_ongoing(unsigned long *ctr)
 /* Written to only by each individual reader. Read by both the reader and the
  * writers.
  */
-__thread struct rcu_reader_data rcu_reader;
+QEMU_DEFINE_CO_TLS(struct rcu_reader_data, rcu_reader)
 
 /* Protected by rcu_registry_lock.  */
 typedef QLIST_HEAD(, rcu_reader_data) ThreadList;
@@ -82,12 +83,6 @@ static void wait_for_readers(void)
          */
         qemu_event_reset(&rcu_gp_event);
 
-        /* Instead of using qatomic_mb_set for index->waiting, and
-         * qatomic_mb_read for index->ctr, memory barriers are placed
-         * manually since writes to different threads are independent.
-         * qemu_event_reset has acquire semantics, so no memory barrier
-         * is needed here.
-         */
         QLIST_FOREACH(index, &registry, node) {
             qatomic_set(&index->waiting, true);
         }
@@ -95,6 +90,10 @@ static void wait_for_readers(void)
         /* Here, order the stores to index->waiting before the loads of
          * index->ctr.  Pairs with smp_mb_placeholder() in rcu_read_unlock(),
          * ensuring that the loads of index->ctr are sequentially consistent.
+         *
+         * If this is the last iteration, this barrier also prevents
+         * frees from seeping upwards, and orders the two wait phases
+         * on architectures with 32-bit longs; see synchronize_rcu().
          */
         smp_mb_global();
 
@@ -103,10 +102,12 @@ static void wait_for_readers(void)
                 QLIST_REMOVE(index, node);
                 QLIST_INSERT_HEAD(&qsreaders, index, node);
 
-                /* No need for mb_set here, worst of all we
+                /* No need for memory barriers here, worst of all we
                  * get some extra futex wakeups.
                  */
                 qatomic_set(&index->waiting, false);
+            } else if (qatomic_read(&in_drain_call_rcu)) {
+                notifier_list_notify(&index->force_rcu, NULL);
             }
         }
 
@@ -146,26 +147,26 @@ void synchronize_rcu(void)
 
     /* Write RCU-protected pointers before reading p_rcu_reader->ctr.
      * Pairs with smp_mb_placeholder() in rcu_read_lock().
+     *
+     * Also orders write to RCU-protected pointers before
+     * write to rcu_gp_ctr.
      */
     smp_mb_global();
 
     QEMU_LOCK_GUARD(&rcu_registry_lock);
     if (!QLIST_EMPTY(&registry)) {
-        /* In either case, the qatomic_mb_set below blocks stores that free
-         * old RCU-protected pointers.
-         */
         if (sizeof(rcu_gp_ctr) < 8) {
             /* For architectures with 32-bit longs, a two-subphases algorithm
              * ensures we do not encounter overflow bugs.
              *
              * Switch parity: 0 -> 1, 1 -> 0.
              */
-            qatomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
+            qatomic_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
             wait_for_readers();
-            qatomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
+            qatomic_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
         } else {
             /* Increment current grace period.  */
-            qatomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
+            qatomic_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
         }
 
         wait_for_readers();
@@ -188,8 +189,22 @@ static void enqueue(struct rcu_head *node)
     struct rcu_head **old_tail;
 
     node->next = NULL;
+
+    /*
+     * Make this node the tail of the list.  The node will be
+     * used by further enqueue operations, but it will not
+     * be dequeued yet...
+     */
     old_tail = qatomic_xchg(&tail, &node->next);
-    qatomic_mb_set(old_tail, node);
+
+    /*
+     * ... until it is pointed to from another item in the list.
+     * In the meantime, try_dequeue() will find a NULL next pointer
+     * and loop.
+     *
+     * Synchronizes with qatomic_load_acquire() in try_dequeue().
+     */
+    qatomic_store_release(old_tail, node);
 }
 
 static struct rcu_head *try_dequeue(void)
@@ -197,26 +212,31 @@ static struct rcu_head *try_dequeue(void)
     struct rcu_head *node, *next;
 
 retry:
-    /* Test for an empty list, which we do not expect.  Note that for
+    /* Head is only written by this thread, so no need for barriers.  */
+    node = head;
+
+    /*
+     * If the head node has NULL in its next pointer, the value is
+     * wrong and we need to wait until its enqueuer finishes the update.
+     */
+    next = qatomic_load_acquire(&node->next);
+    if (!next) {
+        return NULL;
+    }
+
+    /*
+     * Test for an empty list, which we do not expect.  Note that for
      * the consumer head and tail are always consistent.  The head
      * is consistent because only the consumer reads/writes it.
      * The tail, because it is the first step in the enqueuing.
      * It is only the next pointers that might be inconsistent.
      */
-    if (head == &dummy && qatomic_mb_read(&tail) == &dummy.next) {
+    if (head == &dummy && qatomic_read(&tail) == &dummy.next) {
         abort();
     }
 
-    /* If the head node has NULL in its next pointer, the value is
-     * wrong and we need to wait until its enqueuer finishes the update.
-     */
-    node = head;
-    next = qatomic_mb_read(&head->next);
-    if (!next) {
-        return NULL;
-    }
-
-    /* Since we are the sole consumer, and we excluded the empty case
+    /*
+     * Since we are the sole consumer, and we excluded the empty case
      * above, the queue will always have at least two nodes: the
      * dummy node, and the one being removed.  So we do not need to update
      * the tail pointer.
@@ -335,12 +355,14 @@ void drain_call_rcu(void)
      *
      * Note that since we have only one global queue of the RCU callbacks,
      * we also end up waiting for most of RCU callbacks that were registered
-     * on the other threads, but this is a side effect that shoudn't be
+     * on the other threads, but this is a side effect that shouldn't be
      * assumed.
      */
 
+    qatomic_inc(&in_drain_call_rcu);
     call_rcu1(&rcu_drain.rcu, drain_rcu_callback);
     qemu_event_wait(&rcu_drain.drain_complete_event);
+    qatomic_dec(&in_drain_call_rcu);
 
     if (locked) {
         qemu_mutex_lock_iothread();
@@ -350,16 +372,30 @@ void drain_call_rcu(void)
 
 void rcu_register_thread(void)
 {
-    assert(rcu_reader.ctr == 0);
+    assert(get_ptr_rcu_reader()->ctr == 0);
     qemu_mutex_lock(&rcu_registry_lock);
-    QLIST_INSERT_HEAD(&registry, &rcu_reader, node);
+    QLIST_INSERT_HEAD(&registry, get_ptr_rcu_reader(), node);
     qemu_mutex_unlock(&rcu_registry_lock);
 }
 
 void rcu_unregister_thread(void)
 {
     qemu_mutex_lock(&rcu_registry_lock);
-    QLIST_REMOVE(&rcu_reader, node);
+    QLIST_REMOVE(get_ptr_rcu_reader(), node);
+    qemu_mutex_unlock(&rcu_registry_lock);
+}
+
+void rcu_add_force_rcu_notifier(Notifier *n)
+{
+    qemu_mutex_lock(&rcu_registry_lock);
+    notifier_list_add(&get_ptr_rcu_reader()->force_rcu, n);
+    qemu_mutex_unlock(&rcu_registry_lock);
+}
+
+void rcu_remove_force_rcu_notifier(Notifier *n)
+{
+    qemu_mutex_lock(&rcu_registry_lock);
+    notifier_remove(n);
     qemu_mutex_unlock(&rcu_registry_lock);
 }