]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/util/work_queue.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / util / work_queue.h
CommitLineData
20effc67
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6/*
7 * Copyright (c) 2016-present, Facebook, Inc.
8 * All rights reserved.
9 *
10 * This source code is licensed under both the BSD-style license (found in the
11 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
12 * in the COPYING file in the root directory of this source tree).
13 */
14#pragma once
15
16#include <atomic>
17#include <cassert>
18#include <condition_variable>
19#include <cstddef>
20#include <functional>
21#include <mutex>
22#include <queue>
23
1e59de90
TL
24#include "rocksdb/rocksdb_namespace.h"
25
20effc67
TL
26namespace ROCKSDB_NAMESPACE {
27
28/// Unbounded thread-safe work queue.
29//
30// This file is an excerpt from Facebook's zstd repo at
31// https://github.com/facebook/zstd/. The relevant file is
32// contrib/pzstd/utils/WorkQueue.h.
33
34template <typename T>
35class WorkQueue {
36 // Protects all member variable access
37 std::mutex mutex_;
38 std::condition_variable readerCv_;
39 std::condition_variable writerCv_;
40 std::condition_variable finishCv_;
41
42 std::queue<T> queue_;
43 bool done_;
44 std::size_t maxSize_;
45
46 // Must have lock to call this function
47 bool full() const {
48 if (maxSize_ == 0) {
49 return false;
50 }
51 return queue_.size() >= maxSize_;
52 }
53
54 public:
55 /**
56 * Constructs an empty work queue with an optional max size.
57 * If `maxSize == 0` the queue size is unbounded.
58 *
59 * @param maxSize The maximum allowed size of the work queue.
60 */
61 WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {}
62
63 /**
64 * Push an item onto the work queue. Notify a single thread that work is
65 * available. If `finish()` has been called, do nothing and return false.
66 * If `push()` returns false, then `item` has not been copied from.
67 *
68 * @param item Item to push onto the queue.
69 * @returns True upon success, false if `finish()` has been called. An
70 * item was pushed iff `push()` returns true.
71 */
72 template <typename U>
73 bool push(U&& item) {
74 {
75 std::unique_lock<std::mutex> lock(mutex_);
76 while (full() && !done_) {
77 writerCv_.wait(lock);
78 }
79 if (done_) {
80 return false;
81 }
82 queue_.push(std::forward<U>(item));
83 }
84 readerCv_.notify_one();
85 return true;
86 }
87
88 /**
89 * Attempts to pop an item off the work queue. It will block until data is
90 * available or `finish()` has been called.
91 *
92 * @param[out] item If `pop` returns `true`, it contains the popped item.
93 * If `pop` returns `false`, it is unmodified.
94 * @returns True upon success. False if the queue is empty and
95 * `finish()` has been called.
96 */
97 bool pop(T& item) {
98 {
99 std::unique_lock<std::mutex> lock(mutex_);
100 while (queue_.empty() && !done_) {
101 readerCv_.wait(lock);
102 }
103 if (queue_.empty()) {
104 assert(done_);
105 return false;
106 }
107 item = queue_.front();
108 queue_.pop();
109 }
110 writerCv_.notify_one();
111 return true;
112 }
113
114 /**
115 * Sets the maximum queue size. If `maxSize == 0` then it is unbounded.
116 *
117 * @param maxSize The new maximum queue size.
118 */
119 void setMaxSize(std::size_t maxSize) {
120 {
121 std::lock_guard<std::mutex> lock(mutex_);
122 maxSize_ = maxSize;
123 }
124 writerCv_.notify_all();
125 }
126
127 /**
128 * Promise that `push()` won't be called again, so once the queue is empty
129 * there will never any more work.
130 */
131 void finish() {
132 {
133 std::lock_guard<std::mutex> lock(mutex_);
134 assert(!done_);
135 done_ = true;
136 }
137 readerCv_.notify_all();
138 writerCv_.notify_all();
139 finishCv_.notify_all();
140 }
141
142 /// Blocks until `finish()` has been called (but the queue may not be empty).
143 void waitUntilFinished() {
144 std::unique_lock<std::mutex> lock(mutex_);
145 while (!done_) {
146 finishCv_.wait(lock);
147 }
148 }
149};
150} // namespace ROCKSDB_NAMESPACE