]> git.proxmox.com Git - mirror_qemu.git/blobdiff - util/qht.c
migration: Handle block device inactivation failures better
[mirror_qemu.git] / util / qht.c
index aa51be3c52f891dc55990231ba959a9c585b5b77..92c6b787593d60128bd42561d65ea15b4a37c56d 100644 (file)
@@ -49,7 +49,7 @@
  * it anymore.
  *
  * Writers check for concurrent resizes by comparing ht->map before and after
- * acquiring their bucket lock. If they don't match, a resize has occured
+ * acquiring their bucket lock. If they don't match, a resize has occurred
  * while the bucket spinlock was being acquired.
  *
  * Related Work:
@@ -69,6 +69,7 @@
 #include "qemu/qht.h"
 #include "qemu/atomic.h"
 #include "qemu/rcu.h"
+#include "qemu/memalign.h"
 
 //#define QHT_DEBUG
 
@@ -131,11 +132,11 @@ static inline void qht_unlock(struct qht *ht)
 
 /*
  * Note: reading partially-updated pointers in @pointers could lead to
- * segfaults. We thus access them with atomic_read/set; this guarantees
+ * segfaults. We thus access them with qatomic_read/set; this guarantees
  * that the compiler makes all those accesses atomic. We also need the
- * volatile-like behavior in atomic_read, since otherwise the compiler
+ * volatile-like behavior in qatomic_read, since otherwise the compiler
  * might refetch the pointer.
- * atomic_read's are of course not necessary when the bucket lock is held.
+ * qatomic_read's are of course not necessary when the bucket lock is held.
  *
  * If both ht->lock and b->lock are grabbed, ht->lock should always
  * be grabbed first.
@@ -150,6 +151,22 @@ struct qht_bucket {
 
 QEMU_BUILD_BUG_ON(sizeof(struct qht_bucket) > QHT_BUCKET_ALIGN);
 
+/*
+ * Under TSAN, we use striped locks instead of one lock per bucket chain.
+ * This avoids crashing under TSAN, since TSAN aborts the program if more than
+ * 64 locks are held (this is a hardcoded limit in TSAN).
+ * When resizing a QHT we grab all the buckets' locks, which can easily
+ * go over TSAN's limit. By using striped locks, we avoid this problem.
+ *
+ * Note: this number must be a power of two for easy index computation.
+ */
+#define QHT_TSAN_BUCKET_LOCKS_BITS 4
+#define QHT_TSAN_BUCKET_LOCKS (1 << QHT_TSAN_BUCKET_LOCKS_BITS)
+
+struct qht_tsan_lock {
+    QemuSpin lock;
+} QEMU_ALIGNED(QHT_BUCKET_ALIGN);
+
 /**
  * struct qht_map - structure to track an array of buckets
  * @rcu: used by RCU. Keep it as the top field in the struct to help valgrind
@@ -159,6 +176,7 @@ QEMU_BUILD_BUG_ON(sizeof(struct qht_bucket) > QHT_BUCKET_ALIGN);
  * @n_added_buckets: number of added (i.e. "non-head") buckets
  * @n_added_buckets_threshold: threshold to trigger an upward resize once the
  *                             number of added buckets surpasses it.
+ * @tsan_bucket_locks: Array of striped locks to be used only under TSAN.
  *
  * Buckets are tracked in what we call a "map", i.e. this structure.
  */
@@ -168,6 +186,9 @@ struct qht_map {
     size_t n_buckets;
     size_t n_added_buckets;
     size_t n_added_buckets_threshold;
+#ifdef CONFIG_TSAN
+    struct qht_tsan_lock tsan_bucket_locks[QHT_TSAN_BUCKET_LOCKS];
+#endif
 };
 
 /* trigger a resize when n_added_buckets > n_buckets / div */
@@ -228,10 +249,56 @@ static inline size_t qht_elems_to_buckets(size_t n_elems)
     return pow2ceil(n_elems / QHT_BUCKET_ENTRIES);
 }
 
-static inline void qht_head_init(struct qht_bucket *b)
+/*
+ * When using striped locks (i.e. under TSAN), we have to be careful not
+ * to operate on the same lock twice (e.g. when iterating through all buckets).
+ * We achieve this by operating only on each stripe's first matching lock.
+ */
+static inline void qht_do_if_first_in_stripe(struct qht_map *map,
+                                             struct qht_bucket *b,
+                                             void (*func)(QemuSpin *spin))
+{
+#ifdef CONFIG_TSAN
+    unsigned long bucket_idx = b - map->buckets;
+    bool is_first_in_stripe = (bucket_idx >> QHT_TSAN_BUCKET_LOCKS_BITS) == 0;
+    if (is_first_in_stripe) {
+        unsigned long lock_idx = bucket_idx & (QHT_TSAN_BUCKET_LOCKS - 1);
+        func(&map->tsan_bucket_locks[lock_idx].lock);
+    }
+#else
+    func(&b->lock);
+#endif
+}
+
+static inline void qht_bucket_lock_do(struct qht_map *map,
+                                      struct qht_bucket *b,
+                                      void (*func)(QemuSpin *lock))
+{
+#ifdef CONFIG_TSAN
+    unsigned long bucket_idx = b - map->buckets;
+    unsigned long lock_idx = bucket_idx & (QHT_TSAN_BUCKET_LOCKS - 1);
+    func(&map->tsan_bucket_locks[lock_idx].lock);
+#else
+    func(&b->lock);
+#endif
+}
+
+static inline void qht_bucket_lock(struct qht_map *map,
+                                   struct qht_bucket *b)
+{
+    qht_bucket_lock_do(map, b, qemu_spin_lock);
+}
+
+static inline void qht_bucket_unlock(struct qht_map *map,
+                                     struct qht_bucket *b)
+{
+    qht_bucket_lock_do(map, b, qemu_spin_unlock);
+}
+
+static inline void qht_head_init(struct qht_map *map, struct qht_bucket *b)
 {
     memset(b, 0, sizeof(*b));
-    qemu_spin_init(&b->lock);
+    qht_do_if_first_in_stripe(map, b, qemu_spin_init);
     seqlock_init(&b->sequence);
 }
 
@@ -249,7 +316,7 @@ static void qht_map_lock_buckets(struct qht_map *map)
     for (i = 0; i < map->n_buckets; i++) {
         struct qht_bucket *b = &map->buckets[i];
 
-        qemu_spin_lock(&b->lock);
+        qht_do_if_first_in_stripe(map, b, qemu_spin_lock);
     }
 }
 
@@ -260,7 +327,7 @@ static void qht_map_unlock_buckets(struct qht_map *map)
     for (i = 0; i < map->n_buckets; i++) {
         struct qht_bucket *b = &map->buckets[i];
 
-        qemu_spin_unlock(&b->lock);
+        qht_do_if_first_in_stripe(map, b, qemu_spin_unlock);
     }
 }
 
@@ -286,7 +353,7 @@ void qht_map_lock_buckets__no_stale(struct qht *ht, struct qht_map **pmap)
 {
     struct qht_map *map;
 
-    map = atomic_rcu_read(&ht->map);
+    map = qatomic_rcu_read(&ht->map);
     qht_map_lock_buckets(map);
     if (likely(!qht_map_is_stale__locked(ht, map))) {
         *pmap = map;
@@ -307,7 +374,7 @@ void qht_map_lock_buckets__no_stale(struct qht *ht, struct qht_map **pmap)
  * Get a head bucket and lock it, making sure its parent map is not stale.
  * @pmap is filled with a pointer to the bucket's parent map.
  *
- * Unlock with qemu_spin_unlock(&b->lock).
+ * Unlock with qht_bucket_unlock.
  *
  * Note: callers cannot have ht->lock held.
  */
@@ -318,21 +385,21 @@ struct qht_bucket *qht_bucket_lock__no_stale(struct qht *ht, uint32_t hash,
     struct qht_bucket *b;
     struct qht_map *map;
 
-    map = atomic_rcu_read(&ht->map);
+    map = qatomic_rcu_read(&ht->map);
     b = qht_map_to_bucket(map, hash);
 
-    qemu_spin_lock(&b->lock);
+    qht_bucket_lock(map, b);
     if (likely(!qht_map_is_stale__locked(ht, map))) {
         *pmap = map;
         return b;
     }
-    qemu_spin_unlock(&b->lock);
+    qht_bucket_unlock(map, b);
 
     /* we raced with a resize; acquire ht->lock to see the updated ht->map */
     qht_lock(ht);
     map = ht->map;
     b = qht_map_to_bucket(map, hash);
-    qemu_spin_lock(&b->lock);
+    qht_bucket_lock(map, b);
     qht_unlock(ht);
     *pmap = map;
     return b;
@@ -340,14 +407,17 @@ struct qht_bucket *qht_bucket_lock__no_stale(struct qht *ht, uint32_t hash,
 
 static inline bool qht_map_needs_resize(const struct qht_map *map)
 {
-    return atomic_read(&map->n_added_buckets) > map->n_added_buckets_threshold;
+    return qatomic_read(&map->n_added_buckets) >
+           map->n_added_buckets_threshold;
 }
 
-static inline void qht_chain_destroy(const struct qht_bucket *head)
+static inline void qht_chain_destroy(struct qht_map *map,
+                                     struct qht_bucket *head)
 {
     struct qht_bucket *curr = head->next;
     struct qht_bucket *prev;
 
+    qht_do_if_first_in_stripe(map, head, qemu_spin_destroy);
     while (curr) {
         prev = curr;
         curr = curr->next;
@@ -361,7 +431,7 @@ static void qht_map_destroy(struct qht_map *map)
     size_t i;
 
     for (i = 0; i < map->n_buckets; i++) {
-        qht_chain_destroy(&map->buckets[i]);
+        qht_chain_destroy(map, &map->buckets[i]);
     }
     qemu_vfree(map->buckets);
     g_free(map);
@@ -387,7 +457,7 @@ static struct qht_map *qht_map_create(size_t n_buckets)
     map->buckets = qemu_memalign(QHT_BUCKET_ALIGN,
                                  sizeof(*map->buckets) * n_buckets);
     for (i = 0; i < n_buckets; i++) {
-        qht_head_init(&map->buckets[i]);
+        qht_head_init(map, &map->buckets[i]);
     }
     return map;
 }
@@ -403,7 +473,7 @@ void qht_init(struct qht *ht, qht_cmp_func_t cmp, size_t n_elems,
     ht->mode = mode;
     qemu_mutex_init(&ht->lock);
     map = qht_map_create(n_buckets);
-    atomic_rcu_set(&ht->map, map);
+    qatomic_rcu_set(&ht->map, map);
 }
 
 /* call only when there are no readers/writers left */
@@ -424,8 +494,8 @@ static void qht_bucket_reset__locked(struct qht_bucket *head)
             if (b->pointers[i] == NULL) {
                 goto done;
             }
-            atomic_set(&b->hashes[i], 0);
-            atomic_set(&b->pointers[i], NULL);
+            qatomic_set(&b->hashes[i], 0);
+            qatomic_set(&b->pointers[i], NULL);
         }
         b = b->next;
     } while (b);
@@ -491,19 +561,19 @@ void *qht_do_lookup(const struct qht_bucket *head, qht_lookup_func_t func,
 
     do {
         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
-            if (atomic_read(&b->hashes[i]) == hash) {
+            if (qatomic_read(&b->hashes[i]) == hash) {
                 /* The pointer is dereferenced before seqlock_read_retry,
                  * so (unlike qht_insert__locked) we need to use
-                 * atomic_rcu_read here.
+                 * qatomic_rcu_read here.
                  */
-                void *p = atomic_rcu_read(&b->pointers[i]);
+                void *p = qatomic_rcu_read(&b->pointers[i]);
 
                 if (likely(p) && likely(func(p, userp))) {
                     return p;
                 }
             }
         }
-        b = atomic_rcu_read(&b->next);
+        b = qatomic_rcu_read(&b->next);
     } while (b);
 
     return NULL;
@@ -531,7 +601,7 @@ void *qht_lookup_custom(const struct qht *ht, const void *userp, uint32_t hash,
     unsigned int version;
     void *ret;
 
-    map = atomic_rcu_read(&ht->map);
+    map = qatomic_rcu_read(&ht->map);
     b = qht_map_to_bucket(map, hash);
 
     version = seqlock_read_begin(&b->sequence);
@@ -583,7 +653,7 @@ static void *qht_insert__locked(const struct qht *ht, struct qht_map *map,
     memset(b, 0, sizeof(*b));
     new = b;
     i = 0;
-    atomic_inc(&map->n_added_buckets);
+    qatomic_inc(&map->n_added_buckets);
     if (unlikely(qht_map_needs_resize(map)) && needs_resize) {
         *needs_resize = true;
     }
@@ -592,11 +662,11 @@ static void *qht_insert__locked(const struct qht *ht, struct qht_map *map,
     /* found an empty key: acquire the seqlock and write */
     seqlock_write_begin(&head->sequence);
     if (new) {
-        atomic_rcu_set(&prev->next, b);
+        qatomic_rcu_set(&prev->next, b);
     }
     /* smp_wmb() implicit in seqlock_write_begin.  */
-    atomic_set(&b->hashes[i], hash);
-    atomic_set(&b->pointers[i], p);
+    qatomic_set(&b->hashes[i], hash);
+    qatomic_set(&b->pointers[i], p);
     seqlock_write_end(&head->sequence);
     return NULL;
 }
@@ -635,7 +705,7 @@ bool qht_insert(struct qht *ht, void *p, uint32_t hash, void **existing)
     b = qht_bucket_lock__no_stale(ht, hash, &map);
     prev = qht_insert__locked(ht, map, b, p, hash, &needs_resize);
     qht_bucket_debug__locked(b);
-    qemu_spin_unlock(&b->lock);
+    qht_bucket_unlock(map, b);
 
     if (unlikely(needs_resize) && ht->mode & QHT_MODE_AUTO_RESIZE) {
         qht_grow_maybe(ht);
@@ -667,11 +737,11 @@ qht_entry_move(struct qht_bucket *to, int i, struct qht_bucket *from, int j)
     qht_debug_assert(to->pointers[i]);
     qht_debug_assert(from->pointers[j]);
 
-    atomic_set(&to->hashes[i], from->hashes[j]);
-    atomic_set(&to->pointers[i], from->pointers[j]);
+    qatomic_set(&to->hashes[i], from->hashes[j]);
+    qatomic_set(&to->pointers[i], from->pointers[j]);
 
-    atomic_set(&from->hashes[j], 0);
-    atomic_set(&from->pointers[j], NULL);
+    qatomic_set(&from->hashes[j], 0);
+    qatomic_set(&from->pointers[j], NULL);
 }
 
 /*
@@ -685,8 +755,8 @@ static inline void qht_bucket_remove_entry(struct qht_bucket *orig, int pos)
     int i;
 
     if (qht_entry_is_last(orig, pos)) {
-        orig->hashes[pos] = 0;
-        atomic_set(&orig->pointers[pos], NULL);
+        qatomic_set(&orig->hashes[pos], 0);
+        qatomic_set(&orig->pointers[pos], NULL);
         return;
     }
     do {
@@ -746,7 +816,7 @@ bool qht_remove(struct qht *ht, const void *p, uint32_t hash)
     b = qht_bucket_lock__no_stale(ht, hash, &map);
     ret = qht_remove__locked(b, p, hash);
     qht_bucket_debug__locked(b);
-    qemu_spin_unlock(&b->lock);
+    qht_bucket_unlock(map, b);
     return ret;
 }
 
@@ -802,7 +872,7 @@ do_qht_iter(struct qht *ht, const struct qht_iter *iter, void *userp)
 {
     struct qht_map *map;
 
-    map = atomic_rcu_read(&ht->map);
+    map = qatomic_rcu_read(&ht->map);
     qht_map_lock_buckets(map);
     qht_map_iter__all_locked(map, iter, userp);
     qht_map_unlock_buckets(map);
@@ -875,7 +945,7 @@ static void qht_do_resize_reset(struct qht *ht, struct qht_map *new, bool reset)
     qht_map_iter__all_locked(old, &iter, &data);
     qht_map_debug__all_locked(new);
 
-    atomic_rcu_set(&ht->map, new);
+    qatomic_rcu_set(&ht->map, new);
     qht_map_unlock_buckets(old);
     call_rcu(old, qht_map_destroy, rcu);
 }
@@ -904,7 +974,7 @@ void qht_statistics_init(const struct qht *ht, struct qht_stats *stats)
     const struct qht_map *map;
     int i;
 
-    map = atomic_rcu_read(&ht->map);
+    map = qatomic_rcu_read(&ht->map);
 
     stats->used_head_buckets = 0;
     stats->entries = 0;
@@ -932,13 +1002,13 @@ void qht_statistics_init(const struct qht *ht, struct qht_stats *stats)
             b = head;
             do {
                 for (j = 0; j < QHT_BUCKET_ENTRIES; j++) {
-                    if (atomic_read(&b->pointers[j]) == NULL) {
+                    if (qatomic_read(&b->pointers[j]) == NULL) {
                         break;
                     }
                     entries++;
                 }
                 buckets++;
-                b = atomic_rcu_read(&b->next);
+                b = qatomic_rcu_read(&b->next);
             } while (b);
         } while (seqlock_read_retry(&head->sequence, version));