]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2013, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | #pragma once |
6 | ||
7 | #include <list> | |
8 | #include <memory> | |
9 | #include <string> | |
10 | ||
f67539c2 | 11 | #include "memory/arena.h" |
1e59de90 | 12 | #include "rocksdb/comparator.h" |
7c673cae FG |
13 | #include "util/mutexlock.h" |
14 | ||
f67539c2 | 15 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
16 | |
17 | // | |
18 | // CacheWriteBuffer | |
19 | // | |
20 | // Buffer abstraction that can be manipulated via append | |
21 | // (not thread safe) | |
22 | class CacheWriteBuffer { | |
23 | public: | |
24 | explicit CacheWriteBuffer(const size_t size) : size_(size), pos_(0) { | |
25 | buf_.reset(new char[size_]); | |
26 | assert(!pos_); | |
27 | assert(size_); | |
28 | } | |
29 | ||
30 | virtual ~CacheWriteBuffer() {} | |
31 | ||
32 | void Append(const char* buf, const size_t size) { | |
33 | assert(pos_ + size <= size_); | |
34 | memcpy(buf_.get() + pos_, buf, size); | |
35 | pos_ += size; | |
36 | assert(pos_ <= size_); | |
37 | } | |
38 | ||
39 | void FillTrailingZeros() { | |
40 | assert(pos_ <= size_); | |
41 | memset(buf_.get() + pos_, '0', size_ - pos_); | |
42 | pos_ = size_; | |
43 | } | |
44 | ||
45 | void Reset() { pos_ = 0; } | |
46 | size_t Free() const { return size_ - pos_; } | |
47 | size_t Capacity() const { return size_; } | |
48 | size_t Used() const { return pos_; } | |
49 | char* Data() const { return buf_.get(); } | |
50 | ||
51 | private: | |
52 | std::unique_ptr<char[]> buf_; | |
53 | const size_t size_; | |
54 | size_t pos_; | |
55 | }; | |
56 | ||
57 | // | |
58 | // CacheWriteBufferAllocator | |
59 | // | |
60 | // Buffer pool abstraction(not thread safe) | |
61 | // | |
62 | class CacheWriteBufferAllocator { | |
63 | public: | |
64 | explicit CacheWriteBufferAllocator(const size_t buffer_size, | |
65 | const size_t buffer_count) | |
66 | : cond_empty_(&lock_), buffer_size_(buffer_size) { | |
67 | MutexLock _(&lock_); | |
68 | buffer_size_ = buffer_size; | |
69 | for (uint32_t i = 0; i < buffer_count; i++) { | |
70 | auto* buf = new CacheWriteBuffer(buffer_size_); | |
71 | assert(buf); | |
72 | if (buf) { | |
73 | bufs_.push_back(buf); | |
74 | cond_empty_.Signal(); | |
75 | } | |
76 | } | |
77 | } | |
78 | ||
79 | virtual ~CacheWriteBufferAllocator() { | |
80 | MutexLock _(&lock_); | |
81 | assert(bufs_.size() * buffer_size_ == Capacity()); | |
82 | for (auto* buf : bufs_) { | |
83 | delete buf; | |
84 | } | |
85 | bufs_.clear(); | |
86 | } | |
87 | ||
88 | CacheWriteBuffer* Allocate() { | |
89 | MutexLock _(&lock_); | |
90 | if (bufs_.empty()) { | |
91 | return nullptr; | |
92 | } | |
93 | ||
94 | assert(!bufs_.empty()); | |
95 | CacheWriteBuffer* const buf = bufs_.front(); | |
96 | bufs_.pop_front(); | |
97 | return buf; | |
98 | } | |
99 | ||
100 | void Deallocate(CacheWriteBuffer* const buf) { | |
101 | assert(buf); | |
102 | MutexLock _(&lock_); | |
103 | buf->Reset(); | |
104 | bufs_.push_back(buf); | |
105 | cond_empty_.Signal(); | |
106 | } | |
107 | ||
108 | void WaitUntilUsable() { | |
109 | // We are asked to wait till we have buffers available | |
110 | MutexLock _(&lock_); | |
111 | while (bufs_.empty()) { | |
112 | cond_empty_.Wait(); | |
113 | } | |
114 | } | |
115 | ||
116 | size_t Capacity() const { return bufs_.size() * buffer_size_; } | |
117 | size_t Free() const { return bufs_.size() * buffer_size_; } | |
118 | size_t BufferSize() const { return buffer_size_; } | |
119 | ||
120 | private: | |
121 | port::Mutex lock_; // Sync lock | |
122 | port::CondVar cond_empty_; // Condition var for empty buffers | |
123 | size_t buffer_size_; // Size of each buffer | |
124 | std::list<CacheWriteBuffer*> bufs_; // Buffer stash | |
125 | }; | |
126 | ||
f67539c2 | 127 | } // namespace ROCKSDB_NAMESPACE |