]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/util/concurrent_arena.h
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / util / concurrent_arena.h
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #pragma once
11 #include <atomic>
12 #include <memory>
13 #include <utility>
14 #include "port/likely.h"
15 #include "util/allocator.h"
16 #include "util/arena.h"
17 #include "util/mutexlock.h"
18 #include "util/thread_local.h"
19
20 // Only generate field unused warning for padding array, or build under
21 // GCC 4.8.1 will fail.
22 #ifdef __clang__
23 #define ROCKSDB_FIELD_UNUSED __attribute__((__unused__))
24 #else
25 #define ROCKSDB_FIELD_UNUSED
26 #endif // __clang__
27
28 namespace rocksdb {
29
30 class Logger;
31
32 // ConcurrentArena wraps an Arena. It makes it thread safe using a fast
33 // inlined spinlock, and adds small per-core allocation caches to avoid
34 // contention for small allocations. To avoid any memory waste from the
35 // per-core shards, they are kept small, they are lazily instantiated
36 // only if ConcurrentArena actually notices concurrent use, and they
37 // adjust their size so that there is no fragmentation waste when the
38 // shard blocks are allocated from the underlying main arena.
39 class ConcurrentArena : public Allocator {
40 public:
41 // block_size and huge_page_size are the same as for Arena (and are
42 // in fact just passed to the constructor of arena_. The core-local
43 // shards compute their shard_block_size as a fraction of block_size
44 // that varies according to the hardware concurrency level.
45 explicit ConcurrentArena(size_t block_size = Arena::kMinBlockSize,
46 size_t huge_page_size = 0);
47
48 char* Allocate(size_t bytes) override {
49 return AllocateImpl(bytes, false /*force_arena*/,
50 [=]() { return arena_.Allocate(bytes); });
51 }
52
53 char* AllocateAligned(size_t bytes, size_t huge_page_size = 0,
54 Logger* logger = nullptr) override {
55 size_t rounded_up = ((bytes - 1) | (sizeof(void*) - 1)) + 1;
56 assert(rounded_up >= bytes && rounded_up < bytes + sizeof(void*) &&
57 (rounded_up % sizeof(void*)) == 0);
58
59 return AllocateImpl(rounded_up, huge_page_size != 0 /*force_arena*/, [=]() {
60 return arena_.AllocateAligned(rounded_up, huge_page_size, logger);
61 });
62 }
63
64 size_t ApproximateMemoryUsage() const {
65 std::unique_lock<SpinMutex> lock(arena_mutex_, std::defer_lock);
66 if (index_mask_ != 0) {
67 lock.lock();
68 }
69 return arena_.ApproximateMemoryUsage() - ShardAllocatedAndUnused();
70 }
71
72 size_t MemoryAllocatedBytes() const {
73 return memory_allocated_bytes_.load(std::memory_order_relaxed);
74 }
75
76 size_t AllocatedAndUnused() const {
77 return arena_allocated_and_unused_.load(std::memory_order_relaxed) +
78 ShardAllocatedAndUnused();
79 }
80
81 size_t IrregularBlockNum() const {
82 return irregular_block_num_.load(std::memory_order_relaxed);
83 }
84
85 size_t BlockSize() const override { return arena_.BlockSize(); }
86
87 private:
88 struct Shard {
89 char padding[40] ROCKSDB_FIELD_UNUSED;
90 mutable SpinMutex mutex;
91 char* free_begin_;
92 std::atomic<size_t> allocated_and_unused_;
93
94 Shard() : allocated_and_unused_(0) {}
95 };
96
97 #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
98 static __thread uint32_t tls_cpuid;
99 #else
100 enum ZeroFirstEnum : uint32_t { tls_cpuid = 0 };
101 #endif
102
103 char padding0[56] ROCKSDB_FIELD_UNUSED;
104
105 size_t shard_block_size_;
106
107 // shards_[i & index_mask_] is valid
108 size_t index_mask_;
109 std::unique_ptr<Shard[]> shards_;
110
111 Arena arena_;
112 mutable SpinMutex arena_mutex_;
113 std::atomic<size_t> arena_allocated_and_unused_;
114 std::atomic<size_t> memory_allocated_bytes_;
115 std::atomic<size_t> irregular_block_num_;
116
117 char padding1[56] ROCKSDB_FIELD_UNUSED;
118
119 Shard* Repick();
120
121 size_t ShardAllocatedAndUnused() const {
122 size_t total = 0;
123 for (size_t i = 0; i <= index_mask_; ++i) {
124 total += shards_[i].allocated_and_unused_.load(std::memory_order_relaxed);
125 }
126 return total;
127 }
128
129 template <typename Func>
130 char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {
131 uint32_t cpu;
132
133 // Go directly to the arena if the allocation is too large, or if
134 // we've never needed to Repick() and the arena mutex is available
135 // with no waiting. This keeps the fragmentation penalty of
136 // concurrency zero unless it might actually confer an advantage.
137 std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);
138 if (bytes > shard_block_size_ / 4 || force_arena ||
139 ((cpu = tls_cpuid) == 0 &&
140 !shards_[0].allocated_and_unused_.load(std::memory_order_relaxed) &&
141 arena_lock.try_lock())) {
142 if (!arena_lock.owns_lock()) {
143 arena_lock.lock();
144 }
145 auto rv = func();
146 Fixup();
147 return rv;
148 }
149
150 // pick a shard from which to allocate
151 Shard* s = &shards_[cpu & index_mask_];
152 if (!s->mutex.try_lock()) {
153 s = Repick();
154 s->mutex.lock();
155 }
156 std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock);
157
158 size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed);
159 if (avail < bytes) {
160 // reload
161 std::lock_guard<SpinMutex> reload_lock(arena_mutex_);
162
163 // If the arena's current block is within a factor of 2 of the right
164 // size, we adjust our request to avoid arena waste.
165 auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed);
166 assert(exact == arena_.AllocatedAndUnused());
167 avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2
168 ? exact
169 : shard_block_size_;
170 s->free_begin_ = arena_.AllocateAligned(avail);
171 Fixup();
172 }
173 s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed);
174
175 char* rv;
176 if ((bytes % sizeof(void*)) == 0) {
177 // aligned allocation from the beginning
178 rv = s->free_begin_;
179 s->free_begin_ += bytes;
180 } else {
181 // unaligned from the end
182 rv = s->free_begin_ + avail - bytes;
183 }
184 return rv;
185 }
186
187 void Fixup() {
188 arena_allocated_and_unused_.store(arena_.AllocatedAndUnused(),
189 std::memory_order_relaxed);
190 memory_allocated_bytes_.store(arena_.MemoryAllocatedBytes(),
191 std::memory_order_relaxed);
192 irregular_block_num_.store(arena_.IrregularBlockNum(),
193 std::memory_order_relaxed);
194 }
195
196 ConcurrentArena(const ConcurrentArena&) = delete;
197 ConcurrentArena& operator=(const ConcurrentArena&) = delete;
198 };
199
200 } // namespace rocksdb