]>
Commit | Line | Data |
---|---|---|
20effc67 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | /* | |
7 | * Copyright (c) 2016-present, Facebook, Inc. | |
8 | * All rights reserved. | |
9 | * | |
10 | * This source code is licensed under both the BSD-style license (found in the | |
11 | * LICENSE file in the root directory of this source tree) and the GPLv2 (found | |
12 | * in the COPYING file in the root directory of this source tree). | |
13 | */ | |
14 | #pragma once | |
15 | ||
16 | #include <atomic> | |
17 | #include <cassert> | |
18 | #include <condition_variable> | |
19 | #include <cstddef> | |
20 | #include <functional> | |
21 | #include <mutex> | |
22 | #include <queue> | |
23 | ||
1e59de90 TL |
24 | #include "rocksdb/rocksdb_namespace.h" |
25 | ||
20effc67 TL |
26 | namespace ROCKSDB_NAMESPACE { |
27 | ||
28 | /// Unbounded thread-safe work queue. | |
29 | // | |
30 | // This file is an excerpt from Facebook's zstd repo at | |
31 | // https://github.com/facebook/zstd/. The relevant file is | |
32 | // contrib/pzstd/utils/WorkQueue.h. | |
33 | ||
34 | template <typename T> | |
35 | class WorkQueue { | |
36 | // Protects all member variable access | |
37 | std::mutex mutex_; | |
38 | std::condition_variable readerCv_; | |
39 | std::condition_variable writerCv_; | |
40 | std::condition_variable finishCv_; | |
41 | ||
42 | std::queue<T> queue_; | |
43 | bool done_; | |
44 | std::size_t maxSize_; | |
45 | ||
46 | // Must have lock to call this function | |
47 | bool full() const { | |
48 | if (maxSize_ == 0) { | |
49 | return false; | |
50 | } | |
51 | return queue_.size() >= maxSize_; | |
52 | } | |
53 | ||
54 | public: | |
55 | /** | |
56 | * Constructs an empty work queue with an optional max size. | |
57 | * If `maxSize == 0` the queue size is unbounded. | |
58 | * | |
59 | * @param maxSize The maximum allowed size of the work queue. | |
60 | */ | |
61 | WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {} | |
62 | ||
63 | /** | |
64 | * Push an item onto the work queue. Notify a single thread that work is | |
65 | * available. If `finish()` has been called, do nothing and return false. | |
66 | * If `push()` returns false, then `item` has not been copied from. | |
67 | * | |
68 | * @param item Item to push onto the queue. | |
69 | * @returns True upon success, false if `finish()` has been called. An | |
70 | * item was pushed iff `push()` returns true. | |
71 | */ | |
72 | template <typename U> | |
73 | bool push(U&& item) { | |
74 | { | |
75 | std::unique_lock<std::mutex> lock(mutex_); | |
76 | while (full() && !done_) { | |
77 | writerCv_.wait(lock); | |
78 | } | |
79 | if (done_) { | |
80 | return false; | |
81 | } | |
82 | queue_.push(std::forward<U>(item)); | |
83 | } | |
84 | readerCv_.notify_one(); | |
85 | return true; | |
86 | } | |
87 | ||
88 | /** | |
89 | * Attempts to pop an item off the work queue. It will block until data is | |
90 | * available or `finish()` has been called. | |
91 | * | |
92 | * @param[out] item If `pop` returns `true`, it contains the popped item. | |
93 | * If `pop` returns `false`, it is unmodified. | |
94 | * @returns True upon success. False if the queue is empty and | |
95 | * `finish()` has been called. | |
96 | */ | |
97 | bool pop(T& item) { | |
98 | { | |
99 | std::unique_lock<std::mutex> lock(mutex_); | |
100 | while (queue_.empty() && !done_) { | |
101 | readerCv_.wait(lock); | |
102 | } | |
103 | if (queue_.empty()) { | |
104 | assert(done_); | |
105 | return false; | |
106 | } | |
107 | item = queue_.front(); | |
108 | queue_.pop(); | |
109 | } | |
110 | writerCv_.notify_one(); | |
111 | return true; | |
112 | } | |
113 | ||
114 | /** | |
115 | * Sets the maximum queue size. If `maxSize == 0` then it is unbounded. | |
116 | * | |
117 | * @param maxSize The new maximum queue size. | |
118 | */ | |
119 | void setMaxSize(std::size_t maxSize) { | |
120 | { | |
121 | std::lock_guard<std::mutex> lock(mutex_); | |
122 | maxSize_ = maxSize; | |
123 | } | |
124 | writerCv_.notify_all(); | |
125 | } | |
126 | ||
127 | /** | |
128 | * Promise that `push()` won't be called again, so once the queue is empty | |
129 | * there will never any more work. | |
130 | */ | |
131 | void finish() { | |
132 | { | |
133 | std::lock_guard<std::mutex> lock(mutex_); | |
134 | assert(!done_); | |
135 | done_ = true; | |
136 | } | |
137 | readerCv_.notify_all(); | |
138 | writerCv_.notify_all(); | |
139 | finishCv_.notify_all(); | |
140 | } | |
141 | ||
142 | /// Blocks until `finish()` has been called (but the queue may not be empty). | |
143 | void waitUntilFinished() { | |
144 | std::unique_lock<std::mutex> lock(mutex_); | |
145 | while (!done_) { | |
146 | finishCv_.wait(lock); | |
147 | } | |
148 | } | |
149 | }; | |
150 | } // namespace ROCKSDB_NAMESPACE |