]> git.proxmox.com Git - mirror_qemu.git/blobdiff - block/throttle-groups.c
hw/acpi: Consolidate build_mcfg to pci.c
[mirror_qemu.git] / block / throttle-groups.c
index 9ac063a0cddf2b1c35a3989af2ec148236201eb3..a5a20379244e20a7808b7d106972e36d29cbb60f 100644 (file)
  */
 
 #include "qemu/osdep.h"
+#include "sysemu/block-backend.h"
 #include "block/throttle-groups.h"
+#include "qemu/throttle-options.h"
 #include "qemu/queue.h"
 #include "qemu/thread.h"
 #include "sysemu/qtest.h"
+#include "qapi/error.h"
+#include "qapi/qapi-visit-block-core.h"
+#include "qom/object.h"
+#include "qom/object_interfaces.h"
+
+static void throttle_group_obj_init(Object *obj);
+static void throttle_group_obj_complete(UserCreatable *obj, Error **errp);
+static void timer_cb(ThrottleGroupMember *tgm, bool is_write);
 
 /* The ThrottleGroup structure (with its ThrottleState) is shared
- * among different BlockDriverState and it's independent from
+ * among different ThrottleGroupMembers and it's independent from
  * AioContext, so in order to use it from different threads it needs
  * its own locking.
  *
  * The whole ThrottleGroup structure is private and invisible to
  * outside users, that only use it through its ThrottleState.
  *
- * In addition to the ThrottleGroup structure, BlockDriverState has
+ * In addition to the ThrottleGroup structure, ThrottleGroupMember has
  * fields that need to be accessed by other members of the group and
- * therefore also need to be protected by this lock. Once a BDS is
- * registered in a group those fields can be accessed by other threads
- * any time.
+ * therefore also need to be protected by this lock. Once a
+ * ThrottleGroupMember is registered in a group those fields can be accessed
+ * by other threads any time.
  *
  * Again, all this is handled internally and is mostly transparent to
  * the outside. The 'throttle_timers' field however has an additional
  * constraint because it may be temporarily invalid (see for example
- * bdrv_set_aio_context()). Therefore in this file a thread will
- * access some other BDS's timers only after verifying that that BDS
- * has throttled requests in the queue.
+ * blk_set_aio_context()). Therefore in this file a thread will
+ * access some other ThrottleGroupMember's timers only after verifying that
+ * that ThrottleGroupMember has throttled requests in the queue.
  */
 typedef struct ThrottleGroup {
+    Object parent_obj;
+
+    /* refuse individual property change if initialization is complete */
+    bool is_initialized;
     char *name; /* This is constant during the lifetime of the group */
 
     QemuMutex lock; /* This lock protects the following four fields */
     ThrottleState ts;
-    QLIST_HEAD(, BlockDriverState) head;
-    BlockDriverState *tokens[2];
+    QLIST_HEAD(, ThrottleGroupMember) head;
+    ThrottleGroupMember *tokens[2];
     bool any_timer_armed[2];
+    QEMUClockType clock_type;
 
-    /* These two are protected by the global throttle_groups_lock */
-    unsigned refcount;
+    /* This field is protected by the global QEMU mutex */
     QTAILQ_ENTRY(ThrottleGroup) list;
 } ThrottleGroup;
 
-static QemuMutex throttle_groups_lock;
+/* This is protected by the global QEMU mutex */
 static QTAILQ_HEAD(, ThrottleGroup) throttle_groups =
     QTAILQ_HEAD_INITIALIZER(throttle_groups);
 
+
+/* This function reads throttle_groups and must be called under the global
+ * mutex.
+ */
+static ThrottleGroup *throttle_group_by_name(const char *name)
+{
+    ThrottleGroup *iter;
+
+    /* Look for an existing group with that name */
+    QTAILQ_FOREACH(iter, &throttle_groups, list) {
+        if (!g_strcmp0(name, iter->name)) {
+            return iter;
+        }
+    }
+
+    return NULL;
+}
+
+/* This function reads throttle_groups and must be called under the global
+ * mutex.
+ */
+bool throttle_group_exists(const char *name)
+{
+    return throttle_group_by_name(name) != NULL;
+}
+
 /* Increments the reference count of a ThrottleGroup given its name.
  *
  * If no ThrottleGroup is found with the given name a new one is
  * created.
  *
+ * This function edits throttle_groups and must be called under the global
+ * mutex.
+ *
  * @name: the name of the ThrottleGroup
  * @ret:  the ThrottleState member of the ThrottleGroup
  */
 ThrottleState *throttle_group_incref(const char *name)
 {
     ThrottleGroup *tg = NULL;
-    ThrottleGroup *iter;
-
-    qemu_mutex_lock(&throttle_groups_lock);
 
     /* Look for an existing group with that name */
-    QTAILQ_FOREACH(iter, &throttle_groups, list) {
-        if (!strcmp(name, iter->name)) {
-            tg = iter;
-            break;
-        }
-    }
-
-    /* Create a new one if not found */
-    if (!tg) {
-        tg = g_new0(ThrottleGroup, 1);
+    tg = throttle_group_by_name(name);
+
+    if (tg) {
+        object_ref(OBJECT(tg));
+    } else {
+        /* Create a new one if not found */
+        /* new ThrottleGroup obj will have a refcnt = 1 */
+        tg = THROTTLE_GROUP(object_new(TYPE_THROTTLE_GROUP));
         tg->name = g_strdup(name);
-        qemu_mutex_init(&tg->lock);
-        throttle_init(&tg->ts);
-        QLIST_INIT(&tg->head);
-
-        QTAILQ_INSERT_TAIL(&throttle_groups, tg, list);
+        throttle_group_obj_complete(USER_CREATABLE(tg), &error_abort);
     }
 
-    tg->refcount++;
-
-    qemu_mutex_unlock(&throttle_groups_lock);
-
     return &tg->ts;
 }
 
@@ -116,110 +146,132 @@ ThrottleState *throttle_group_incref(const char *name)
  * When the reference count reaches zero the ThrottleGroup is
  * destroyed.
  *
+ * This function edits throttle_groups and must be called under the global
+ * mutex.
+ *
  * @ts:  The ThrottleGroup to unref, given by its ThrottleState member
  */
 void throttle_group_unref(ThrottleState *ts)
 {
     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
-
-    qemu_mutex_lock(&throttle_groups_lock);
-    if (--tg->refcount == 0) {
-        QTAILQ_REMOVE(&throttle_groups, tg, list);
-        qemu_mutex_destroy(&tg->lock);
-        g_free(tg->name);
-        g_free(tg);
-    }
-    qemu_mutex_unlock(&throttle_groups_lock);
+    object_unref(OBJECT(tg));
 }
 
-/* Get the name from a BlockDriverState's ThrottleGroup. The name (and
- * the pointer) is guaranteed to remain constant during the lifetime
- * of the group.
+/* Get the name from a ThrottleGroupMember's group. The name (and the pointer)
+ * is guaranteed to remain constant during the lifetime of the group.
  *
- * @bs:   a BlockDriverState that is member of a throttling group
+ * @tgm:  a ThrottleGroupMember
  * @ret:  the name of the group.
  */
-const char *throttle_group_get_name(BlockDriverState *bs)
+const char *throttle_group_get_name(ThrottleGroupMember *tgm)
 {
-    ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
+    ThrottleGroup *tg = container_of(tgm->throttle_state, ThrottleGroup, ts);
     return tg->name;
 }
 
-/* Return the next BlockDriverState in the round-robin sequence,
- * simulating a circular list.
+/* Return the next ThrottleGroupMember in the round-robin sequence, simulating
+ * a circular list.
  *
  * This assumes that tg->lock is held.
  *
- * @bs:  the current BlockDriverState
- * @ret: the next BlockDriverState in the sequence
+ * @tgm: the current ThrottleGroupMember
+ * @ret: the next ThrottleGroupMember in the sequence
  */
-static BlockDriverState *throttle_group_next_bs(BlockDriverState *bs)
+static ThrottleGroupMember *throttle_group_next_tgm(ThrottleGroupMember *tgm)
 {
-    ThrottleState *ts = bs->throttle_state;
+    ThrottleState *ts = tgm->throttle_state;
     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
-    BlockDriverState *next = QLIST_NEXT(bs, round_robin);
+    ThrottleGroupMember *next = QLIST_NEXT(tgm, round_robin);
 
     if (!next) {
-        return QLIST_FIRST(&tg->head);
+        next = QLIST_FIRST(&tg->head);
     }
 
     return next;
 }
 
-/* Return the next BlockDriverState in the round-robin sequence with
- * pending I/O requests.
+/*
+ * Return whether a ThrottleGroupMember has pending requests.
  *
  * This assumes that tg->lock is held.
  *
- * @bs:        the current BlockDriverState
+ * @tgm:        the ThrottleGroupMember
+ * @is_write:   the type of operation (read/write)
+ * @ret:        whether the ThrottleGroupMember has pending requests.
+ */
+static inline bool tgm_has_pending_reqs(ThrottleGroupMember *tgm,
+                                        bool is_write)
+{
+    return tgm->pending_reqs[is_write];
+}
+
+/* Return the next ThrottleGroupMember in the round-robin sequence with pending
+ * I/O requests.
+ *
+ * This assumes that tg->lock is held.
+ *
+ * @tgm:       the current ThrottleGroupMember
  * @is_write:  the type of operation (read/write)
- * @ret:       the next BlockDriverState with pending requests, or bs
- *             if there is none.
+ * @ret:       the next ThrottleGroupMember with pending requests, or tgm if
+ *             there is none.
  */
-static BlockDriverState *next_throttle_token(BlockDriverState *bs,
-                                             bool is_write)
+static ThrottleGroupMember *next_throttle_token(ThrottleGroupMember *tgm,
+                                                bool is_write)
 {
-    ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
-    BlockDriverState *token, *start;
+    ThrottleState *ts = tgm->throttle_state;
+    ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
+    ThrottleGroupMember *token, *start;
+
+    /* If this member has its I/O limits disabled then it means that
+     * it's being drained. Skip the round-robin search and return tgm
+     * immediately if it has pending requests. Otherwise we could be
+     * forcing it to wait for other member's throttled requests. */
+    if (tgm_has_pending_reqs(tgm, is_write) &&
+        atomic_read(&tgm->io_limits_disabled)) {
+        return tgm;
+    }
 
     start = token = tg->tokens[is_write];
 
     /* get next bs round in round robin style */
-    token = throttle_group_next_bs(token);
-    while (token != start && !token->pending_reqs[is_write]) {
-        token = throttle_group_next_bs(token);
+    token = throttle_group_next_tgm(token);
+    while (token != start && !tgm_has_pending_reqs(token, is_write)) {
+        token = throttle_group_next_tgm(token);
     }
 
     /* If no IO are queued for scheduling on the next round robin token
-     * then decide the token is the current bs because chances are
-     * the current bs get the current request queued.
+     * then decide the token is the current tgm because chances are
+     * the current tgm got the current request queued.
      */
-    if (token == start && !token->pending_reqs[is_write]) {
-        token = bs;
+    if (token == start && !tgm_has_pending_reqs(token, is_write)) {
+        token = tgm;
     }
 
+    /* Either we return the original TGM, or one with pending requests */
+    assert(token == tgm || tgm_has_pending_reqs(token, is_write));
+
     return token;
 }
 
-/* Check if the next I/O request for a BlockDriverState needs to be
- * throttled or not. If there's no timer set in this group, set one
- * and update the token accordingly.
+/* Check if the next I/O request for a ThrottleGroupMember needs to be
+ * throttled or not. If there's no timer set in this group, set one and update
+ * the token accordingly.
  *
  * This assumes that tg->lock is held.
  *
- * @bs:         the current BlockDriverState
+ * @tgm:        the current ThrottleGroupMember
  * @is_write:   the type of operation (read/write)
  * @ret:        whether the I/O request needs to be throttled or not
  */
-static bool throttle_group_schedule_timer(BlockDriverState *bs,
+static bool throttle_group_schedule_timer(ThrottleGroupMember *tgm,
                                           bool is_write)
 {
-    ThrottleState *ts = bs->throttle_state;
-    ThrottleTimers *tt = &bs->throttle_timers;
+    ThrottleState *ts = tgm->throttle_state;
     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
+    ThrottleTimers *tt = &tgm->throttle_timers;
     bool must_wait;
 
-    if (bs->io_limits_disabled) {
+    if (atomic_read(&tgm->io_limits_disabled)) {
         return false;
     }
 
@@ -230,31 +282,50 @@ static bool throttle_group_schedule_timer(BlockDriverState *bs,
 
     must_wait = throttle_schedule_timer(ts, tt, is_write);
 
-    /* If a timer just got armed, set bs as the current token */
+    /* If a timer just got armed, set tgm as the current token */
     if (must_wait) {
-        tg->tokens[is_write] = bs;
+        tg->tokens[is_write] = tgm;
         tg->any_timer_armed[is_write] = true;
     }
 
     return must_wait;
 }
 
+/* Start the next pending I/O request for a ThrottleGroupMember. Return whether
+ * any request was actually pending.
+ *
+ * @tgm:       the current ThrottleGroupMember
+ * @is_write:  the type of operation (read/write)
+ */
+static bool coroutine_fn throttle_group_co_restart_queue(ThrottleGroupMember *tgm,
+                                                         bool is_write)
+{
+    bool ret;
+
+    qemu_co_mutex_lock(&tgm->throttled_reqs_lock);
+    ret = qemu_co_queue_next(&tgm->throttled_reqs[is_write]);
+    qemu_co_mutex_unlock(&tgm->throttled_reqs_lock);
+
+    return ret;
+}
+
 /* Look for the next pending I/O request and schedule it.
  *
  * This assumes that tg->lock is held.
  *
- * @bs:        the current BlockDriverState
+ * @tgm:       the current ThrottleGroupMember
  * @is_write:  the type of operation (read/write)
  */
-static void schedule_next_request(BlockDriverState *bs, bool is_write)
+static void schedule_next_request(ThrottleGroupMember *tgm, bool is_write)
 {
-    ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
+    ThrottleState *ts = tgm->throttle_state;
+    ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
     bool must_wait;
-    BlockDriverState *token;
+    ThrottleGroupMember *token;
 
     /* Check if there's any pending request to schedule next */
-    token = next_throttle_token(bs, is_write);
-    if (!token->pending_reqs[is_write]) {
+    token = next_throttle_token(tgm, is_write);
+    if (!tgm_has_pending_reqs(token, is_write)) {
         return;
     }
 
@@ -263,14 +334,14 @@ static void schedule_next_request(BlockDriverState *bs, bool is_write)
 
     /* If it doesn't have to wait, queue it for immediate execution */
     if (!must_wait) {
-        /* Give preference to requests from the current bs */
+        /* Give preference to requests from the current tgm */
         if (qemu_in_coroutine() &&
-            qemu_co_queue_next(&bs->throttled_reqs[is_write])) {
-            token = bs;
+            throttle_group_co_restart_queue(tgm, is_write)) {
+            token = tgm;
         } else {
             ThrottleTimers *tt = &token->throttle_timers;
-            int64_t now = qemu_clock_get_ns(tt->clock_type);
-            timer_mod(tt->timers[is_write], now + 1);
+            int64_t now = qemu_clock_get_ns(tg->clock_type);
+            timer_mod(tt->timers[is_write], now);
             tg->any_timer_armed[is_write] = true;
         }
         tg->tokens[is_write] = token;
@@ -281,49 +352,108 @@ static void schedule_next_request(BlockDriverState *bs, bool is_write)
  * if necessary, and schedule the next request using a round robin
  * algorithm.
  *
- * @bs:        the current BlockDriverState
+ * @tgm:       the current ThrottleGroupMember
  * @bytes:     the number of bytes for this I/O
  * @is_write:  the type of operation (read/write)
  */
-void coroutine_fn throttle_group_co_io_limits_intercept(BlockDriverState *bs,
+void coroutine_fn throttle_group_co_io_limits_intercept(ThrottleGroupMember *tgm,
                                                         unsigned int bytes,
                                                         bool is_write)
 {
     bool must_wait;
-    BlockDriverState *token;
-
-    ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
+    ThrottleGroupMember *token;
+    ThrottleGroup *tg = container_of(tgm->throttle_state, ThrottleGroup, ts);
     qemu_mutex_lock(&tg->lock);
 
     /* First we check if this I/O has to be throttled. */
-    token = next_throttle_token(bs, is_write);
+    token = next_throttle_token(tgm, is_write);
     must_wait = throttle_group_schedule_timer(token, is_write);
 
     /* Wait if there's a timer set or queued requests of this type */
-    if (must_wait || bs->pending_reqs[is_write]) {
-        bs->pending_reqs[is_write]++;
+    if (must_wait || tgm->pending_reqs[is_write]) {
+        tgm->pending_reqs[is_write]++;
         qemu_mutex_unlock(&tg->lock);
-        qemu_co_queue_wait(&bs->throttled_reqs[is_write]);
+        qemu_co_mutex_lock(&tgm->throttled_reqs_lock);
+        qemu_co_queue_wait(&tgm->throttled_reqs[is_write],
+                           &tgm->throttled_reqs_lock);
+        qemu_co_mutex_unlock(&tgm->throttled_reqs_lock);
         qemu_mutex_lock(&tg->lock);
-        bs->pending_reqs[is_write]--;
+        tgm->pending_reqs[is_write]--;
     }
 
     /* The I/O will be executed, so do the accounting */
-    throttle_account(bs->throttle_state, is_write, bytes);
+    throttle_account(tgm->throttle_state, is_write, bytes);
 
     /* Schedule the next request */
-    schedule_next_request(bs, is_write);
+    schedule_next_request(tgm, is_write);
 
     qemu_mutex_unlock(&tg->lock);
 }
 
-void throttle_group_restart_bs(BlockDriverState *bs)
+typedef struct {
+    ThrottleGroupMember *tgm;
+    bool is_write;
+} RestartData;
+
+static void coroutine_fn throttle_group_restart_queue_entry(void *opaque)
+{
+    RestartData *data = opaque;
+    ThrottleGroupMember *tgm = data->tgm;
+    ThrottleState *ts = tgm->throttle_state;
+    ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
+    bool is_write = data->is_write;
+    bool empty_queue;
+
+    empty_queue = !throttle_group_co_restart_queue(tgm, is_write);
+
+    /* If the request queue was empty then we have to take care of
+     * scheduling the next one */
+    if (empty_queue) {
+        qemu_mutex_lock(&tg->lock);
+        schedule_next_request(tgm, is_write);
+        qemu_mutex_unlock(&tg->lock);
+    }
+
+    g_free(data);
+
+    atomic_dec(&tgm->restart_pending);
+    aio_wait_kick();
+}
+
+static void throttle_group_restart_queue(ThrottleGroupMember *tgm, bool is_write)
+{
+    Coroutine *co;
+    RestartData *rd = g_new0(RestartData, 1);
+
+    rd->tgm = tgm;
+    rd->is_write = is_write;
+
+    /* This function is called when a timer is fired or when
+     * throttle_group_restart_tgm() is called. Either way, there can
+     * be no timer pending on this tgm at this point */
+    assert(!timer_pending(tgm->throttle_timers.timers[is_write]));
+
+    atomic_inc(&tgm->restart_pending);
+
+    co = qemu_coroutine_create(throttle_group_restart_queue_entry, rd);
+    aio_co_enter(tgm->aio_context, co);
+}
+
+void throttle_group_restart_tgm(ThrottleGroupMember *tgm)
 {
     int i;
 
-    for (i = 0; i < 2; i++) {
-        while (qemu_co_enter_next(&bs->throttled_reqs[i])) {
-            ;
+    if (tgm->throttle_state) {
+        for (i = 0; i < 2; i++) {
+            QEMUTimer *t = tgm->throttle_timers.timers[i];
+            if (timer_pending(t)) {
+                /* If there's a pending timer on this tgm, fire it now */
+                timer_del(t);
+                timer_cb(tgm, i);
+            } else {
+                /* Else run the next request from the queue manually */
+                throttle_group_restart_queue(tgm, i);
+            }
         }
     }
 }
@@ -332,39 +462,30 @@ void throttle_group_restart_bs(BlockDriverState *bs)
  * to throttle_config(), but guarantees atomicity within the
  * throttling group.
  *
- * @bs:  a BlockDriverState that is member of the group
+ * @tgm:    a ThrottleGroupMember that is a member of the group
  * @cfg: the configuration to set
  */
-void throttle_group_config(BlockDriverState *bs, ThrottleConfig *cfg)
+void throttle_group_config(ThrottleGroupMember *tgm, ThrottleConfig *cfg)
 {
-    ThrottleTimers *tt = &bs->throttle_timers;
-    ThrottleState *ts = bs->throttle_state;
+    ThrottleState *ts = tgm->throttle_state;
     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
     qemu_mutex_lock(&tg->lock);
-    /* throttle_config() cancels the timers */
-    if (timer_pending(tt->timers[0])) {
-        tg->any_timer_armed[0] = false;
-    }
-    if (timer_pending(tt->timers[1])) {
-        tg->any_timer_armed[1] = false;
-    }
-    throttle_config(ts, tt, cfg);
+    throttle_config(ts, tg->clock_type, cfg);
     qemu_mutex_unlock(&tg->lock);
 
-    qemu_co_enter_next(&bs->throttled_reqs[0]);
-    qemu_co_enter_next(&bs->throttled_reqs[1]);
+    throttle_group_restart_tgm(tgm);
 }
 
 /* Get the throttle configuration from a particular group. Similar to
  * throttle_get_config(), but guarantees atomicity within the
  * throttling group.
  *
- * @bs:  a BlockDriverState that is member of the group
+ * @tgm:    a ThrottleGroupMember that is a member of the group
  * @cfg: the configuration will be written here
  */
-void throttle_group_get_config(BlockDriverState *bs, ThrottleConfig *cfg)
+void throttle_group_get_config(ThrottleGroupMember *tgm, ThrottleConfig *cfg)
 {
-    ThrottleState *ts = bs->throttle_state;
+    ThrottleState *ts = tgm->throttle_state;
     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
     qemu_mutex_lock(&tg->lock);
     throttle_get_config(ts, cfg);
@@ -374,14 +495,13 @@ void throttle_group_get_config(BlockDriverState *bs, ThrottleConfig *cfg)
 /* ThrottleTimers callback. This wakes up a request that was waiting
  * because it had been throttled.
  *
- * @bs:        the BlockDriverState whose request had been throttled
+ * @tgm:       the ThrottleGroupMember whose request had been throttled
  * @is_write:  the type of operation (read/write)
  */
-static void timer_cb(BlockDriverState *bs, bool is_write)
+static void timer_cb(ThrottleGroupMember *tgm, bool is_write)
 {
-    ThrottleState *ts = bs->throttle_state;
+    ThrottleState *ts = tgm->throttle_state;
     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
-    bool empty_queue;
 
     /* The timer has just been fired, so we can update the flag */
     qemu_mutex_lock(&tg->lock);
@@ -389,15 +509,7 @@ static void timer_cb(BlockDriverState *bs, bool is_write)
     qemu_mutex_unlock(&tg->lock);
 
     /* Run the request that was waiting for this timer */
-    empty_queue = !qemu_co_enter_next(&bs->throttled_reqs[is_write]);
-
-    /* If the request queue was empty then we have to take care of
-     * scheduling the next one */
-    if (empty_queue) {
-        qemu_mutex_lock(&tg->lock);
-        schedule_next_request(bs, is_write);
-        qemu_mutex_unlock(&tg->lock);
-    }
+    throttle_group_restart_queue(tgm, is_write);
 }
 
 static void read_timer_cb(void *opaque)
@@ -410,92 +522,466 @@ static void write_timer_cb(void *opaque)
     timer_cb(opaque, true);
 }
 
-/* Register a BlockDriverState in the throttling group, also
- * initializing its timers and updating its throttle_state pointer to
- * point to it. If a throttling group with that name does not exist
- * yet, it will be created.
+/* Register a ThrottleGroupMember from the throttling group, also initializing
+ * its timers and updating its throttle_state pointer to point to it. If a
+ * throttling group with that name does not exist yet, it will be created.
+ *
+ * This function edits throttle_groups and must be called under the global
+ * mutex.
  *
- * @bs:        the BlockDriverState to insert
+ * @tgm:       the ThrottleGroupMember to insert
  * @groupname: the name of the group
+ * @ctx:       the AioContext to use
  */
-void throttle_group_register_bs(BlockDriverState *bs, const char *groupname)
+void throttle_group_register_tgm(ThrottleGroupMember *tgm,
+                                 const char *groupname,
+                                 AioContext *ctx)
 {
     int i;
     ThrottleState *ts = throttle_group_incref(groupname);
     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
-    int clock_type = QEMU_CLOCK_REALTIME;
 
-    if (qtest_enabled()) {
-        /* For testing block IO throttling only */
-        clock_type = QEMU_CLOCK_VIRTUAL;
-    }
-
-    bs->throttle_state = ts;
+    tgm->throttle_state = ts;
+    tgm->aio_context = ctx;
+    atomic_set(&tgm->restart_pending, 0);
 
     qemu_mutex_lock(&tg->lock);
-    /* If the ThrottleGroup is new set this BlockDriverState as the token */
+    /* If the ThrottleGroup is new set this ThrottleGroupMember as the token */
     for (i = 0; i < 2; i++) {
         if (!tg->tokens[i]) {
-            tg->tokens[i] = bs;
+            tg->tokens[i] = tgm;
         }
     }
 
-    QLIST_INSERT_HEAD(&tg->head, bs, round_robin);
+    QLIST_INSERT_HEAD(&tg->head, tgm, round_robin);
 
-    throttle_timers_init(&bs->throttle_timers,
-                         bdrv_get_aio_context(bs),
-                         clock_type,
+    throttle_timers_init(&tgm->throttle_timers,
+                         tgm->aio_context,
+                         tg->clock_type,
                          read_timer_cb,
                          write_timer_cb,
-                         bs);
+                         tgm);
+    qemu_co_mutex_init(&tgm->throttled_reqs_lock);
+    qemu_co_queue_init(&tgm->throttled_reqs[0]);
+    qemu_co_queue_init(&tgm->throttled_reqs[1]);
 
     qemu_mutex_unlock(&tg->lock);
 }
 
-/* Unregister a BlockDriverState from its group, removing it from the
- * list, destroying the timers and setting the throttle_state pointer
- * to NULL.
+/* Unregister a ThrottleGroupMember from its group, removing it from the list,
+ * destroying the timers and setting the throttle_state pointer to NULL.
  *
- * The BlockDriverState must not have pending throttled requests, so
- * the caller has to drain them first.
+ * The ThrottleGroupMember must not have pending throttled requests, so the
+ * caller has to drain them first.
  *
  * The group will be destroyed if it's empty after this operation.
  *
- * @bs: the BlockDriverState to remove
+ * @tgm the ThrottleGroupMember to remove
  */
-void throttle_group_unregister_bs(BlockDriverState *bs)
+void throttle_group_unregister_tgm(ThrottleGroupMember *tgm)
 {
-    ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
+    ThrottleState *ts = tgm->throttle_state;
+    ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
+    ThrottleGroupMember *token;
     int i;
 
-    assert(bs->pending_reqs[0] == 0 && bs->pending_reqs[1] == 0);
-    assert(qemu_co_queue_empty(&bs->throttled_reqs[0]));
-    assert(qemu_co_queue_empty(&bs->throttled_reqs[1]));
+    if (!ts) {
+        /* Discard already unregistered tgm */
+        return;
+    }
+
+    /* Wait for throttle_group_restart_queue_entry() coroutines to finish */
+    AIO_WAIT_WHILE(tgm->aio_context, atomic_read(&tgm->restart_pending) > 0);
 
     qemu_mutex_lock(&tg->lock);
     for (i = 0; i < 2; i++) {
-        if (tg->tokens[i] == bs) {
-            BlockDriverState *token = throttle_group_next_bs(bs);
-            /* Take care of the case where this is the last bs in the group */
-            if (token == bs) {
+        assert(tgm->pending_reqs[i] == 0);
+        assert(qemu_co_queue_empty(&tgm->throttled_reqs[i]));
+        assert(!timer_pending(tgm->throttle_timers.timers[i]));
+        if (tg->tokens[i] == tgm) {
+            token = throttle_group_next_tgm(tgm);
+            /* Take care of the case where this is the last tgm in the group */
+            if (token == tgm) {
                 token = NULL;
             }
             tg->tokens[i] = token;
         }
     }
 
-    /* remove the current bs from the list */
-    QLIST_REMOVE(bs, round_robin);
-    throttle_timers_destroy(&bs->throttle_timers);
+    /* remove the current tgm from the list */
+    QLIST_REMOVE(tgm, round_robin);
+    throttle_timers_destroy(&tgm->throttle_timers);
     qemu_mutex_unlock(&tg->lock);
 
     throttle_group_unref(&tg->ts);
-    bs->throttle_state = NULL;
+    tgm->throttle_state = NULL;
+}
+
+void throttle_group_attach_aio_context(ThrottleGroupMember *tgm,
+                                       AioContext *new_context)
+{
+    ThrottleTimers *tt = &tgm->throttle_timers;
+    throttle_timers_attach_aio_context(tt, new_context);
+    tgm->aio_context = new_context;
+}
+
+void throttle_group_detach_aio_context(ThrottleGroupMember *tgm)
+{
+    ThrottleGroup *tg = container_of(tgm->throttle_state, ThrottleGroup, ts);
+    ThrottleTimers *tt = &tgm->throttle_timers;
+    int i;
+
+    /* Requests must have been drained */
+    assert(tgm->pending_reqs[0] == 0 && tgm->pending_reqs[1] == 0);
+    assert(qemu_co_queue_empty(&tgm->throttled_reqs[0]));
+    assert(qemu_co_queue_empty(&tgm->throttled_reqs[1]));
+
+    /* Kick off next ThrottleGroupMember, if necessary */
+    qemu_mutex_lock(&tg->lock);
+    for (i = 0; i < 2; i++) {
+        if (timer_pending(tt->timers[i])) {
+            tg->any_timer_armed[i] = false;
+            schedule_next_request(tgm, i);
+        }
+    }
+    qemu_mutex_unlock(&tg->lock);
+
+    throttle_timers_detach_aio_context(tt);
+    tgm->aio_context = NULL;
+}
+
+#undef THROTTLE_OPT_PREFIX
+#define THROTTLE_OPT_PREFIX "x-"
+
+/* Helper struct and array for QOM property setter/getter */
+typedef struct {
+    const char *name;
+    BucketType type;
+    enum {
+        AVG,
+        MAX,
+        BURST_LENGTH,
+        IOPS_SIZE,
+    } category;
+} ThrottleParamInfo;
+
+static ThrottleParamInfo properties[] = {
+    {
+        THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_TOTAL,
+        THROTTLE_OPS_TOTAL, AVG,
+    },
+    {
+        THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_TOTAL_MAX,
+        THROTTLE_OPS_TOTAL, MAX,
+    },
+    {
+        THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_TOTAL_MAX_LENGTH,
+        THROTTLE_OPS_TOTAL, BURST_LENGTH,
+    },
+    {
+        THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_READ,
+        THROTTLE_OPS_READ, AVG,
+    },
+    {
+        THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_READ_MAX,
+        THROTTLE_OPS_READ, MAX,
+    },
+    {
+        THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_READ_MAX_LENGTH,
+        THROTTLE_OPS_READ, BURST_LENGTH,
+    },
+    {
+        THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_WRITE,
+        THROTTLE_OPS_WRITE, AVG,
+    },
+    {
+        THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_WRITE_MAX,
+        THROTTLE_OPS_WRITE, MAX,
+    },
+    {
+        THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_WRITE_MAX_LENGTH,
+        THROTTLE_OPS_WRITE, BURST_LENGTH,
+    },
+    {
+        THROTTLE_OPT_PREFIX QEMU_OPT_BPS_TOTAL,
+        THROTTLE_BPS_TOTAL, AVG,
+    },
+    {
+        THROTTLE_OPT_PREFIX QEMU_OPT_BPS_TOTAL_MAX,
+        THROTTLE_BPS_TOTAL, MAX,
+    },
+    {
+        THROTTLE_OPT_PREFIX QEMU_OPT_BPS_TOTAL_MAX_LENGTH,
+        THROTTLE_BPS_TOTAL, BURST_LENGTH,
+    },
+    {
+        THROTTLE_OPT_PREFIX QEMU_OPT_BPS_READ,
+        THROTTLE_BPS_READ, AVG,
+    },
+    {
+        THROTTLE_OPT_PREFIX QEMU_OPT_BPS_READ_MAX,
+        THROTTLE_BPS_READ, MAX,
+    },
+    {
+        THROTTLE_OPT_PREFIX QEMU_OPT_BPS_READ_MAX_LENGTH,
+        THROTTLE_BPS_READ, BURST_LENGTH,
+    },
+    {
+        THROTTLE_OPT_PREFIX QEMU_OPT_BPS_WRITE,
+        THROTTLE_BPS_WRITE, AVG,
+    },
+    {
+        THROTTLE_OPT_PREFIX QEMU_OPT_BPS_WRITE_MAX,
+        THROTTLE_BPS_WRITE, MAX,
+    },
+    {
+        THROTTLE_OPT_PREFIX QEMU_OPT_BPS_WRITE_MAX_LENGTH,
+        THROTTLE_BPS_WRITE, BURST_LENGTH,
+    },
+    {
+        THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_SIZE,
+        0, IOPS_SIZE,
+    }
+};
+
+/* This function edits throttle_groups and must be called under the global
+ * mutex */
+static void throttle_group_obj_init(Object *obj)
+{
+    ThrottleGroup *tg = THROTTLE_GROUP(obj);
+
+    tg->clock_type = QEMU_CLOCK_REALTIME;
+    if (qtest_enabled()) {
+        /* For testing block IO throttling only */
+        tg->clock_type = QEMU_CLOCK_VIRTUAL;
+    }
+    tg->is_initialized = false;
+    qemu_mutex_init(&tg->lock);
+    throttle_init(&tg->ts);
+    QLIST_INIT(&tg->head);
+}
+
+/* This function edits throttle_groups and must be called under the global
+ * mutex */
+static void throttle_group_obj_complete(UserCreatable *obj, Error **errp)
+{
+    ThrottleGroup *tg = THROTTLE_GROUP(obj);
+    ThrottleConfig cfg;
+
+    /* set group name to object id if it exists */
+    if (!tg->name && tg->parent_obj.parent) {
+        tg->name = object_get_canonical_path_component(OBJECT(obj));
+    }
+    /* We must have a group name at this point */
+    assert(tg->name);
+
+    /* error if name is duplicate */
+    if (throttle_group_exists(tg->name)) {
+        error_setg(errp, "A group with this name already exists");
+        return;
+    }
+
+    /* check validity */
+    throttle_get_config(&tg->ts, &cfg);
+    if (!throttle_is_valid(&cfg, errp)) {
+        return;
+    }
+    throttle_config(&tg->ts, tg->clock_type, &cfg);
+    QTAILQ_INSERT_TAIL(&throttle_groups, tg, list);
+    tg->is_initialized = true;
+}
+
+/* This function edits throttle_groups and must be called under the global
+ * mutex */
+static void throttle_group_obj_finalize(Object *obj)
+{
+    ThrottleGroup *tg = THROTTLE_GROUP(obj);
+    if (tg->is_initialized) {
+        QTAILQ_REMOVE(&throttle_groups, tg, list);
+    }
+    qemu_mutex_destroy(&tg->lock);
+    g_free(tg->name);
 }
 
+static void throttle_group_set(Object *obj, Visitor *v, const char * name,
+                               void *opaque, Error **errp)
+
+{
+    ThrottleGroup *tg = THROTTLE_GROUP(obj);
+    ThrottleConfig *cfg;
+    ThrottleParamInfo *info = opaque;
+    Error *local_err = NULL;
+    int64_t value;
+
+    /* If we have finished initialization, don't accept individual property
+     * changes through QOM. Throttle configuration limits must be set in one
+     * transaction, as certain combinations are invalid.
+     */
+    if (tg->is_initialized) {
+        error_setg(&local_err, "Property cannot be set after initialization");
+        goto ret;
+    }
+
+    visit_type_int64(v, name, &value, &local_err);
+    if (local_err) {
+        goto ret;
+    }
+    if (value < 0) {
+        error_setg(&local_err, "Property values cannot be negative");
+        goto ret;
+    }
+
+    cfg = &tg->ts.cfg;
+    switch (info->category) {
+    case AVG:
+        cfg->buckets[info->type].avg = value;
+        break;
+    case MAX:
+        cfg->buckets[info->type].max = value;
+        break;
+    case BURST_LENGTH:
+        if (value > UINT_MAX) {
+            error_setg(&local_err, "%s value must be in the"
+                       "range [0, %u]", info->name, UINT_MAX);
+            goto ret;
+        }
+        cfg->buckets[info->type].burst_length = value;
+        break;
+    case IOPS_SIZE:
+        cfg->op_size = value;
+        break;
+    }
+
+ret:
+    error_propagate(errp, local_err);
+    return;
+
+}
+
+static void throttle_group_get(Object *obj, Visitor *v, const char *name,
+                               void *opaque, Error **errp)
+{
+    ThrottleGroup *tg = THROTTLE_GROUP(obj);
+    ThrottleConfig cfg;
+    ThrottleParamInfo *info = opaque;
+    int64_t value;
+
+    throttle_get_config(&tg->ts, &cfg);
+    switch (info->category) {
+    case AVG:
+        value = cfg.buckets[info->type].avg;
+        break;
+    case MAX:
+        value = cfg.buckets[info->type].max;
+        break;
+    case BURST_LENGTH:
+        value = cfg.buckets[info->type].burst_length;
+        break;
+    case IOPS_SIZE:
+        value = cfg.op_size;
+        break;
+    }
+
+    visit_type_int64(v, name, &value, errp);
+}
+
+static void throttle_group_set_limits(Object *obj, Visitor *v,
+                                      const char *name, void *opaque,
+                                      Error **errp)
+
+{
+    ThrottleGroup *tg = THROTTLE_GROUP(obj);
+    ThrottleConfig cfg;
+    ThrottleLimits arg = { 0 };
+    ThrottleLimits *argp = &arg;
+    Error *local_err = NULL;
+
+    visit_type_ThrottleLimits(v, name, &argp, &local_err);
+    if (local_err) {
+        goto ret;
+    }
+    qemu_mutex_lock(&tg->lock);
+    throttle_get_config(&tg->ts, &cfg);
+    throttle_limits_to_config(argp, &cfg, &local_err);
+    if (local_err) {
+        goto unlock;
+    }
+    throttle_config(&tg->ts, tg->clock_type, &cfg);
+
+unlock:
+    qemu_mutex_unlock(&tg->lock);
+ret:
+    error_propagate(errp, local_err);
+    return;
+}
+
+static void throttle_group_get_limits(Object *obj, Visitor *v,
+                                      const char *name, void *opaque,
+                                      Error **errp)
+{
+    ThrottleGroup *tg = THROTTLE_GROUP(obj);
+    ThrottleConfig cfg;
+    ThrottleLimits arg = { 0 };
+    ThrottleLimits *argp = &arg;
+
+    qemu_mutex_lock(&tg->lock);
+    throttle_get_config(&tg->ts, &cfg);
+    qemu_mutex_unlock(&tg->lock);
+
+    throttle_config_to_limits(&cfg, argp);
+
+    visit_type_ThrottleLimits(v, name, &argp, errp);
+}
+
+static bool throttle_group_can_be_deleted(UserCreatable *uc)
+{
+    return OBJECT(uc)->ref == 1;
+}
+
+static void throttle_group_obj_class_init(ObjectClass *klass, void *class_data)
+{
+    size_t i = 0;
+    UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass);
+
+    ucc->complete = throttle_group_obj_complete;
+    ucc->can_be_deleted = throttle_group_can_be_deleted;
+
+    /* individual properties */
+    for (i = 0; i < sizeof(properties) / sizeof(ThrottleParamInfo); i++) {
+        object_class_property_add(klass,
+                                  properties[i].name,
+                                  "int",
+                                  throttle_group_get,
+                                  throttle_group_set,
+                                  NULL, &properties[i],
+                                  &error_abort);
+    }
+
+    /* ThrottleLimits */
+    object_class_property_add(klass,
+                              "limits", "ThrottleLimits",
+                              throttle_group_get_limits,
+                              throttle_group_set_limits,
+                              NULL, NULL,
+                              &error_abort);
+}
+
+static const TypeInfo throttle_group_info = {
+    .name = TYPE_THROTTLE_GROUP,
+    .parent = TYPE_OBJECT,
+    .class_init = throttle_group_obj_class_init,
+    .instance_size = sizeof(ThrottleGroup),
+    .instance_init = throttle_group_obj_init,
+    .instance_finalize = throttle_group_obj_finalize,
+    .interfaces = (InterfaceInfo[]) {
+        { TYPE_USER_CREATABLE },
+        { }
+    },
+};
+
 static void throttle_groups_init(void)
 {
-    qemu_mutex_init(&throttle_groups_lock);
+    type_register_static(&throttle_group_info);
 }
 
-block_init(throttle_groups_init);
+type_init(throttle_groups_init);