]> git.proxmox.com Git - mirror_zfs.git/blame - module/zfs/bqueue.c
ddt: modernise assertions
[mirror_zfs.git] / module / zfs / bqueue.c
CommitLineData
fcff0f35
PD
1/*
2 * CDDL HEADER START
3 *
4 * This file and its contents are supplied under the terms of the
5 * Common Development and Distribution License ("CDDL"), version 1.0.
6 * You may only use this file in accordance with the terms of version
7 * 1.0 of the CDDL.
8 *
9 * A full copy of the text of the CDDL should have accompanied this
10 * source. A copy of the CDDL is also available via the Internet at
11 * http://www.illumos.org/license/CDDL.
12 *
13 * CDDL HEADER END
14 */
15/*
30af21b0 16 * Copyright (c) 2014, 2018 by Delphix. All rights reserved.
fcff0f35
PD
17 */
18
19#include <sys/bqueue.h>
20#include <sys/zfs_context.h>
21
22static inline bqueue_node_t *
23obj2node(bqueue_t *q, void *data)
24{
25 return ((bqueue_node_t *)((char *)data + q->bq_node_offset));
26}
27
28/*
29 * Initialize a blocking queue The maximum capacity of the queue is set to
fc45975e
MA
30 * size. Types that are stored in a bqueue must contain a bqueue_node_t, and
31 * node_offset must be its offset from the start of the struct. fill_fraction
32 * is a performance tuning value; when the queue is full, any threads
33 * attempting to enqueue records will block. They will block until they're
34 * signaled, which will occur when the queue is at least 1/fill_fraction
30af21b0 35 * empty. Similar behavior occurs on dequeue; if the queue is empty, threads
fc45975e
MA
36 * block. They will be signalled when the queue has 1/fill_fraction full.
37 * As a result, you must call bqueue_enqueue_flush() when you enqueue your
38 * final record on a thread, in case the dequeuing threads are currently
39 * blocked and that enqueue does not cause them to be woken. Alternatively,
40 * this behavior can be disabled (causing signaling to happen immediately) by
41 * setting fill_fraction to any value larger than size. Return 0 on success,
42 * or -1 on failure.
43 *
44 * Note: The caller must ensure that for a given bqueue_t, there's only a
45 * single call to bqueue_enqueue() running at a time (e.g. by calling only
46 * from a single thread, or with locking around the call). Similarly, the
47 * caller must ensure that there's only a single call to bqueue_dequeue()
48 * running at a time. However, the one call to bqueue_enqueue() may be
49 * invoked concurrently with the one call to bqueue_dequeue().
fcff0f35
PD
50 */
51int
577d41d3 52bqueue_init(bqueue_t *q, uint_t fill_fraction, size_t size, size_t node_offset)
fcff0f35 53{
30af21b0
PD
54 if (fill_fraction == 0) {
55 return (-1);
56 }
fcff0f35
PD
57 list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t),
58 node_offset + offsetof(bqueue_node_t, bqn_node));
fc45975e
MA
59 list_create(&q->bq_dequeuing_list, node_offset + sizeof (bqueue_node_t),
60 node_offset + offsetof(bqueue_node_t, bqn_node));
61 list_create(&q->bq_enqueuing_list, node_offset + sizeof (bqueue_node_t),
62 node_offset + offsetof(bqueue_node_t, bqn_node));
fcff0f35
PD
63 cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL);
64 cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL);
65 mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL);
66 q->bq_node_offset = node_offset;
67 q->bq_size = 0;
fc45975e
MA
68 q->bq_dequeuing_size = 0;
69 q->bq_enqueuing_size = 0;
fcff0f35 70 q->bq_maxsize = size;
30af21b0 71 q->bq_fill_fraction = fill_fraction;
fcff0f35
PD
72 return (0);
73}
74
75/*
76 * Destroy a blocking queue. This function asserts that there are no
77 * elements in the queue, and no one is blocked on the condition
78 * variables.
79 */
80void
81bqueue_destroy(bqueue_t *q)
82{
30af21b0 83 mutex_enter(&q->bq_lock);
fcff0f35 84 ASSERT0(q->bq_size);
fc45975e
MA
85 ASSERT0(q->bq_dequeuing_size);
86 ASSERT0(q->bq_enqueuing_size);
fcff0f35
PD
87 cv_destroy(&q->bq_add_cv);
88 cv_destroy(&q->bq_pop_cv);
fcff0f35 89 list_destroy(&q->bq_list);
fc45975e
MA
90 list_destroy(&q->bq_dequeuing_list);
91 list_destroy(&q->bq_enqueuing_list);
30af21b0
PD
92 mutex_exit(&q->bq_lock);
93 mutex_destroy(&q->bq_lock);
fcff0f35
PD
94}
95
30af21b0 96static void
577d41d3 97bqueue_enqueue_impl(bqueue_t *q, void *data, size_t item_size, boolean_t flush)
fcff0f35
PD
98{
99 ASSERT3U(item_size, >, 0);
e9e3d31d 100 ASSERT3U(item_size, <=, q->bq_maxsize);
fc45975e 101
fcff0f35 102 obj2node(q, data)->bqn_size = item_size;
fc45975e
MA
103 q->bq_enqueuing_size += item_size;
104 list_insert_tail(&q->bq_enqueuing_list, data);
105
106 if (flush ||
107 q->bq_enqueuing_size >= q->bq_maxsize / q->bq_fill_fraction) {
108 /* Append the enquing list to the shared list. */
109 mutex_enter(&q->bq_lock);
110 while (q->bq_size > q->bq_maxsize) {
111 cv_wait_sig(&q->bq_add_cv, &q->bq_lock);
112 }
113 q->bq_size += q->bq_enqueuing_size;
114 list_move_tail(&q->bq_list, &q->bq_enqueuing_list);
115 q->bq_enqueuing_size = 0;
30af21b0 116 cv_broadcast(&q->bq_pop_cv);
fc45975e
MA
117 mutex_exit(&q->bq_lock);
118 }
fcff0f35 119}
30af21b0
PD
120
121/*
122 * Add data to q, consuming size units of capacity. If there is insufficient
123 * capacity to consume size units, block until capacity exists. Asserts size is
124 * > 0.
125 */
126void
577d41d3 127bqueue_enqueue(bqueue_t *q, void *data, size_t item_size)
30af21b0
PD
128{
129 bqueue_enqueue_impl(q, data, item_size, B_FALSE);
130}
131
132/*
133 * Enqueue an entry, and then flush the queue. This forces the popping threads
134 * to wake up, even if we're below the fill fraction. We have this in a single
135 * function, rather than having a separate call, because it prevents race
fc45975e
MA
136 * conditions between the enqueuing thread and the dequeuing thread, where the
137 * enqueueing thread will wake up the dequeuing thread, that thread will
30af21b0
PD
138 * destroy the condvar before the enqueuing thread is done.
139 */
140void
577d41d3 141bqueue_enqueue_flush(bqueue_t *q, void *data, size_t item_size)
30af21b0
PD
142{
143 bqueue_enqueue_impl(q, data, item_size, B_TRUE);
144}
145
fcff0f35
PD
146/*
147 * Take the first element off of q. If there are no elements on the queue, wait
148 * until one is put there. Return the removed element.
149 */
150void *
151bqueue_dequeue(bqueue_t *q)
152{
fc45975e
MA
153 void *ret = list_remove_head(&q->bq_dequeuing_list);
154 if (ret == NULL) {
155 /*
156 * Dequeuing list is empty. Wait for there to be something on
157 * the shared list, then move the entire shared list to the
158 * dequeuing list.
159 */
160 mutex_enter(&q->bq_lock);
161 while (q->bq_size == 0) {
162 cv_wait_sig(&q->bq_pop_cv, &q->bq_lock);
163 }
164 ASSERT0(q->bq_dequeuing_size);
165 ASSERT(list_is_empty(&q->bq_dequeuing_list));
166 list_move_tail(&q->bq_dequeuing_list, &q->bq_list);
167 q->bq_dequeuing_size = q->bq_size;
168 q->bq_size = 0;
169 cv_broadcast(&q->bq_add_cv);
170 mutex_exit(&q->bq_lock);
171 ret = list_remove_head(&q->bq_dequeuing_list);
fcff0f35 172 }
fc45975e 173 q->bq_dequeuing_size -= obj2node(q, ret)->bqn_size;
fcff0f35
PD
174 return (ret);
175}