]>
Commit | Line | Data |
---|---|---|
fcff0f35 PD |
1 | /* |
2 | * CDDL HEADER START | |
3 | * | |
4 | * This file and its contents are supplied under the terms of the | |
5 | * Common Development and Distribution License ("CDDL"), version 1.0. | |
6 | * You may only use this file in accordance with the terms of version | |
7 | * 1.0 of the CDDL. | |
8 | * | |
9 | * A full copy of the text of the CDDL should have accompanied this | |
10 | * source. A copy of the CDDL is also available via the Internet at | |
11 | * http://www.illumos.org/license/CDDL. | |
12 | * | |
13 | * CDDL HEADER END | |
14 | */ | |
15 | /* | |
30af21b0 | 16 | * Copyright (c) 2014, 2018 by Delphix. All rights reserved. |
fcff0f35 PD |
17 | */ |
18 | ||
19 | #include <sys/bqueue.h> | |
20 | #include <sys/zfs_context.h> | |
21 | ||
22 | static inline bqueue_node_t * | |
23 | obj2node(bqueue_t *q, void *data) | |
24 | { | |
25 | return ((bqueue_node_t *)((char *)data + q->bq_node_offset)); | |
26 | } | |
27 | ||
28 | /* | |
29 | * Initialize a blocking queue The maximum capacity of the queue is set to | |
fc45975e MA |
30 | * size. Types that are stored in a bqueue must contain a bqueue_node_t, and |
31 | * node_offset must be its offset from the start of the struct. fill_fraction | |
32 | * is a performance tuning value; when the queue is full, any threads | |
33 | * attempting to enqueue records will block. They will block until they're | |
34 | * signaled, which will occur when the queue is at least 1/fill_fraction | |
30af21b0 | 35 | * empty. Similar behavior occurs on dequeue; if the queue is empty, threads |
fc45975e MA |
36 | * block. They will be signalled when the queue has 1/fill_fraction full. |
37 | * As a result, you must call bqueue_enqueue_flush() when you enqueue your | |
38 | * final record on a thread, in case the dequeuing threads are currently | |
39 | * blocked and that enqueue does not cause them to be woken. Alternatively, | |
40 | * this behavior can be disabled (causing signaling to happen immediately) by | |
41 | * setting fill_fraction to any value larger than size. Return 0 on success, | |
42 | * or -1 on failure. | |
43 | * | |
44 | * Note: The caller must ensure that for a given bqueue_t, there's only a | |
45 | * single call to bqueue_enqueue() running at a time (e.g. by calling only | |
46 | * from a single thread, or with locking around the call). Similarly, the | |
47 | * caller must ensure that there's only a single call to bqueue_dequeue() | |
48 | * running at a time. However, the one call to bqueue_enqueue() may be | |
49 | * invoked concurrently with the one call to bqueue_dequeue(). | |
fcff0f35 PD |
50 | */ |
51 | int | |
577d41d3 | 52 | bqueue_init(bqueue_t *q, uint_t fill_fraction, size_t size, size_t node_offset) |
fcff0f35 | 53 | { |
30af21b0 PD |
54 | if (fill_fraction == 0) { |
55 | return (-1); | |
56 | } | |
fcff0f35 PD |
57 | list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t), |
58 | node_offset + offsetof(bqueue_node_t, bqn_node)); | |
fc45975e MA |
59 | list_create(&q->bq_dequeuing_list, node_offset + sizeof (bqueue_node_t), |
60 | node_offset + offsetof(bqueue_node_t, bqn_node)); | |
61 | list_create(&q->bq_enqueuing_list, node_offset + sizeof (bqueue_node_t), | |
62 | node_offset + offsetof(bqueue_node_t, bqn_node)); | |
fcff0f35 PD |
63 | cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL); |
64 | cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL); | |
65 | mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL); | |
66 | q->bq_node_offset = node_offset; | |
67 | q->bq_size = 0; | |
fc45975e MA |
68 | q->bq_dequeuing_size = 0; |
69 | q->bq_enqueuing_size = 0; | |
fcff0f35 | 70 | q->bq_maxsize = size; |
30af21b0 | 71 | q->bq_fill_fraction = fill_fraction; |
fcff0f35 PD |
72 | return (0); |
73 | } | |
74 | ||
75 | /* | |
76 | * Destroy a blocking queue. This function asserts that there are no | |
77 | * elements in the queue, and no one is blocked on the condition | |
78 | * variables. | |
79 | */ | |
80 | void | |
81 | bqueue_destroy(bqueue_t *q) | |
82 | { | |
30af21b0 | 83 | mutex_enter(&q->bq_lock); |
fcff0f35 | 84 | ASSERT0(q->bq_size); |
fc45975e MA |
85 | ASSERT0(q->bq_dequeuing_size); |
86 | ASSERT0(q->bq_enqueuing_size); | |
fcff0f35 PD |
87 | cv_destroy(&q->bq_add_cv); |
88 | cv_destroy(&q->bq_pop_cv); | |
fcff0f35 | 89 | list_destroy(&q->bq_list); |
fc45975e MA |
90 | list_destroy(&q->bq_dequeuing_list); |
91 | list_destroy(&q->bq_enqueuing_list); | |
30af21b0 PD |
92 | mutex_exit(&q->bq_lock); |
93 | mutex_destroy(&q->bq_lock); | |
fcff0f35 PD |
94 | } |
95 | ||
30af21b0 | 96 | static void |
577d41d3 | 97 | bqueue_enqueue_impl(bqueue_t *q, void *data, size_t item_size, boolean_t flush) |
fcff0f35 PD |
98 | { |
99 | ASSERT3U(item_size, >, 0); | |
e9e3d31d | 100 | ASSERT3U(item_size, <=, q->bq_maxsize); |
fc45975e | 101 | |
fcff0f35 | 102 | obj2node(q, data)->bqn_size = item_size; |
fc45975e MA |
103 | q->bq_enqueuing_size += item_size; |
104 | list_insert_tail(&q->bq_enqueuing_list, data); | |
105 | ||
106 | if (flush || | |
107 | q->bq_enqueuing_size >= q->bq_maxsize / q->bq_fill_fraction) { | |
108 | /* Append the enquing list to the shared list. */ | |
109 | mutex_enter(&q->bq_lock); | |
110 | while (q->bq_size > q->bq_maxsize) { | |
111 | cv_wait_sig(&q->bq_add_cv, &q->bq_lock); | |
112 | } | |
113 | q->bq_size += q->bq_enqueuing_size; | |
114 | list_move_tail(&q->bq_list, &q->bq_enqueuing_list); | |
115 | q->bq_enqueuing_size = 0; | |
30af21b0 | 116 | cv_broadcast(&q->bq_pop_cv); |
fc45975e MA |
117 | mutex_exit(&q->bq_lock); |
118 | } | |
fcff0f35 | 119 | } |
30af21b0 PD |
120 | |
121 | /* | |
122 | * Add data to q, consuming size units of capacity. If there is insufficient | |
123 | * capacity to consume size units, block until capacity exists. Asserts size is | |
124 | * > 0. | |
125 | */ | |
126 | void | |
577d41d3 | 127 | bqueue_enqueue(bqueue_t *q, void *data, size_t item_size) |
30af21b0 PD |
128 | { |
129 | bqueue_enqueue_impl(q, data, item_size, B_FALSE); | |
130 | } | |
131 | ||
132 | /* | |
133 | * Enqueue an entry, and then flush the queue. This forces the popping threads | |
134 | * to wake up, even if we're below the fill fraction. We have this in a single | |
135 | * function, rather than having a separate call, because it prevents race | |
fc45975e MA |
136 | * conditions between the enqueuing thread and the dequeuing thread, where the |
137 | * enqueueing thread will wake up the dequeuing thread, that thread will | |
30af21b0 PD |
138 | * destroy the condvar before the enqueuing thread is done. |
139 | */ | |
140 | void | |
577d41d3 | 141 | bqueue_enqueue_flush(bqueue_t *q, void *data, size_t item_size) |
30af21b0 PD |
142 | { |
143 | bqueue_enqueue_impl(q, data, item_size, B_TRUE); | |
144 | } | |
145 | ||
fcff0f35 PD |
146 | /* |
147 | * Take the first element off of q. If there are no elements on the queue, wait | |
148 | * until one is put there. Return the removed element. | |
149 | */ | |
150 | void * | |
151 | bqueue_dequeue(bqueue_t *q) | |
152 | { | |
fc45975e MA |
153 | void *ret = list_remove_head(&q->bq_dequeuing_list); |
154 | if (ret == NULL) { | |
155 | /* | |
156 | * Dequeuing list is empty. Wait for there to be something on | |
157 | * the shared list, then move the entire shared list to the | |
158 | * dequeuing list. | |
159 | */ | |
160 | mutex_enter(&q->bq_lock); | |
161 | while (q->bq_size == 0) { | |
162 | cv_wait_sig(&q->bq_pop_cv, &q->bq_lock); | |
163 | } | |
164 | ASSERT0(q->bq_dequeuing_size); | |
165 | ASSERT(list_is_empty(&q->bq_dequeuing_list)); | |
166 | list_move_tail(&q->bq_dequeuing_list, &q->bq_list); | |
167 | q->bq_dequeuing_size = q->bq_size; | |
168 | q->bq_size = 0; | |
169 | cv_broadcast(&q->bq_add_cv); | |
170 | mutex_exit(&q->bq_lock); | |
171 | ret = list_remove_head(&q->bq_dequeuing_list); | |
fcff0f35 | 172 | } |
fc45975e | 173 | q->bq_dequeuing_size -= obj2node(q, ret)->bqn_size; |
fcff0f35 PD |
174 | return (ret); |
175 | } |