]>
Commit | Line | Data |
---|---|---|
fcff0f35 PD |
1 | /* |
2 | * CDDL HEADER START | |
3 | * | |
4 | * This file and its contents are supplied under the terms of the | |
5 | * Common Development and Distribution License ("CDDL"), version 1.0. | |
6 | * You may only use this file in accordance with the terms of version | |
7 | * 1.0 of the CDDL. | |
8 | * | |
9 | * A full copy of the text of the CDDL should have accompanied this | |
10 | * source. A copy of the CDDL is also available via the Internet at | |
11 | * http://www.illumos.org/license/CDDL. | |
12 | * | |
13 | * CDDL HEADER END | |
14 | */ | |
15 | /* | |
16 | * Copyright (c) 2014 by Delphix. All rights reserved. | |
17 | */ | |
18 | ||
19 | #include <sys/bqueue.h> | |
20 | #include <sys/zfs_context.h> | |
21 | ||
22 | static inline bqueue_node_t * | |
23 | obj2node(bqueue_t *q, void *data) | |
24 | { | |
25 | return ((bqueue_node_t *)((char *)data + q->bq_node_offset)); | |
26 | } | |
27 | ||
28 | /* | |
29 | * Initialize a blocking queue The maximum capacity of the queue is set to | |
30 | * size. Types that want to be stored in a bqueue must contain a bqueue_node_t, | |
31 | * and offset should give its offset from the start of the struct. Return 0 on | |
32 | * success, or -1 on failure. | |
33 | */ | |
34 | int | |
35 | bqueue_init(bqueue_t *q, uint64_t size, size_t node_offset) | |
36 | { | |
37 | list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t), | |
38 | node_offset + offsetof(bqueue_node_t, bqn_node)); | |
39 | cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL); | |
40 | cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL); | |
41 | mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL); | |
42 | q->bq_node_offset = node_offset; | |
43 | q->bq_size = 0; | |
44 | q->bq_maxsize = size; | |
45 | return (0); | |
46 | } | |
47 | ||
48 | /* | |
49 | * Destroy a blocking queue. This function asserts that there are no | |
50 | * elements in the queue, and no one is blocked on the condition | |
51 | * variables. | |
52 | */ | |
53 | void | |
54 | bqueue_destroy(bqueue_t *q) | |
55 | { | |
56 | ASSERT0(q->bq_size); | |
57 | cv_destroy(&q->bq_add_cv); | |
58 | cv_destroy(&q->bq_pop_cv); | |
59 | mutex_destroy(&q->bq_lock); | |
60 | list_destroy(&q->bq_list); | |
61 | } | |
62 | ||
63 | /* | |
64 | * Add data to q, consuming size units of capacity. If there is insufficient | |
65 | * capacity to consume size units, block until capacity exists. Asserts size is | |
66 | * > 0. | |
67 | */ | |
68 | void | |
69 | bqueue_enqueue(bqueue_t *q, void *data, uint64_t item_size) | |
70 | { | |
71 | ASSERT3U(item_size, >, 0); | |
e9e3d31d | 72 | ASSERT3U(item_size, <=, q->bq_maxsize); |
fcff0f35 PD |
73 | mutex_enter(&q->bq_lock); |
74 | obj2node(q, data)->bqn_size = item_size; | |
75 | while (q->bq_size + item_size > q->bq_maxsize) { | |
76 | cv_wait(&q->bq_add_cv, &q->bq_lock); | |
77 | } | |
78 | q->bq_size += item_size; | |
79 | list_insert_tail(&q->bq_list, data); | |
80 | cv_signal(&q->bq_pop_cv); | |
81 | mutex_exit(&q->bq_lock); | |
82 | } | |
83 | /* | |
84 | * Take the first element off of q. If there are no elements on the queue, wait | |
85 | * until one is put there. Return the removed element. | |
86 | */ | |
87 | void * | |
88 | bqueue_dequeue(bqueue_t *q) | |
89 | { | |
ccc92611 | 90 | void *ret = NULL; |
fcff0f35 PD |
91 | uint64_t item_size; |
92 | mutex_enter(&q->bq_lock); | |
93 | while (q->bq_size == 0) { | |
94 | cv_wait(&q->bq_pop_cv, &q->bq_lock); | |
95 | } | |
96 | ret = list_remove_head(&q->bq_list); | |
ccc92611 | 97 | ASSERT3P(ret, !=, NULL); |
fcff0f35 PD |
98 | item_size = obj2node(q, ret)->bqn_size; |
99 | q->bq_size -= item_size; | |
fcff0f35 | 100 | cv_signal(&q->bq_add_cv); |
03928896 | 101 | mutex_exit(&q->bq_lock); |
fcff0f35 PD |
102 | return (ret); |
103 | } | |
104 | ||
105 | /* | |
106 | * Returns true if the space used is 0. | |
107 | */ | |
108 | boolean_t | |
109 | bqueue_empty(bqueue_t *q) | |
110 | { | |
111 | return (q->bq_size == 0); | |
112 | } |