QSIMPLEQ_INIT(&queue->entries);
}
-void coroutine_fn qemu_co_queue_wait(CoQueue *queue, CoMutex *mutex)
+void coroutine_fn qemu_co_queue_wait_impl(CoQueue *queue, QemuLockable *lock)
{
Coroutine *self = qemu_coroutine_self();
QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next);
- if (mutex) {
- qemu_co_mutex_unlock(mutex);
+ if (lock) {
+ qemu_lockable_unlock(lock);
}
/* There is no race condition here. Other threads will call
/* TODO: OSv implements wait morphing here, where the wakeup
* primitive automatically places the woken coroutine on the
* mutex's queue. This avoids the thundering herd effect.
+ * This could be implemented for CoMutexes, but not really for
+ * other cases of QemuLockable.
*/
- if (mutex) {
- qemu_co_mutex_lock(mutex);
- }
-}
-
-/**
- * qemu_co_queue_run_restart:
- *
- * Enter each coroutine that was previously marked for restart by
- * qemu_co_queue_next() or qemu_co_queue_restart_all(). This function is
- * invoked by the core coroutine code when the current coroutine yields or
- * terminates.
- */
-void qemu_co_queue_run_restart(Coroutine *co)
-{
- Coroutine *next;
- QSIMPLEQ_HEAD(, Coroutine) tmp_queue_wakeup =
- QSIMPLEQ_HEAD_INITIALIZER(tmp_queue_wakeup);
-
- trace_qemu_co_queue_run_restart(co);
-
- /* Because "co" has yielded, any coroutine that we wakeup can resume it.
- * If this happens and "co" terminates, co->co_queue_wakeup becomes
- * invalid memory. Therefore, use a temporary queue and do not touch
- * the "co" coroutine as soon as you enter another one.
- *
- * In its turn resumed "co" can pupulate "co_queue_wakeup" queue with
- * new coroutines to be woken up. The caller, who has resumed "co",
- * will be responsible for traversing the same queue, which may cause
- * a different wakeup order but not any missing wakeups.
- */
- QSIMPLEQ_CONCAT(&tmp_queue_wakeup, &co->co_queue_wakeup);
-
- while ((next = QSIMPLEQ_FIRST(&tmp_queue_wakeup))) {
- QSIMPLEQ_REMOVE_HEAD(&tmp_queue_wakeup, co_queue_next);
- qemu_coroutine_enter(next);
+ if (lock) {
+ qemu_lockable_lock(lock);
}
}
qemu_co_queue_do_restart(queue, false);
}
-bool qemu_co_enter_next(CoQueue *queue)
+bool qemu_co_enter_next_impl(CoQueue *queue, QemuLockable *lock)
{
Coroutine *next;
}
QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
- qemu_coroutine_enter(next);
+ if (lock) {
+ qemu_lockable_unlock(lock);
+ }
+ aio_co_wake(next);
+ if (lock) {
+ qemu_lockable_lock(lock);
+ }
return true;
}
qemu_co_mutex_unlock(&lock->mutex);
}
+void qemu_co_rwlock_downgrade(CoRwlock *lock)
+{
+ Coroutine *self = qemu_coroutine_self();
+
+ /* lock->mutex critical section started in qemu_co_rwlock_wrlock or
+ * qemu_co_rwlock_upgrade.
+ */
+ assert(lock->reader == 0);
+ lock->reader++;
+ qemu_co_mutex_unlock(&lock->mutex);
+
+ /* The rest of the read-side critical section is run without the mutex. */
+ self->locks_held++;
+}
+
void qemu_co_rwlock_wrlock(CoRwlock *lock)
{
qemu_co_mutex_lock(&lock->mutex);
* There is no need to update self->locks_held.
*/
}
+
+void qemu_co_rwlock_upgrade(CoRwlock *lock)
+{
+ Coroutine *self = qemu_coroutine_self();
+
+ qemu_co_mutex_lock(&lock->mutex);
+ assert(lock->reader > 0);
+ lock->reader--;
+ lock->pending_writer++;
+ while (lock->reader) {
+ qemu_co_queue_wait(&lock->queue, &lock->mutex);
+ }
+ lock->pending_writer--;
+
+ /* The rest of the write-side critical section is run with
+ * the mutex taken, similar to qemu_co_rwlock_wrlock. Do
+ * not account for the lock twice in self->locks_held.
+ */
+ self->locks_held--;
+}