]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/memory/concurrent_arena.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / memory / concurrent_arena.h
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#pragma once
11#include <atomic>
12#include <memory>
13#include <utility>
f67539c2
TL
14#include "memory/allocator.h"
15#include "memory/arena.h"
20effc67 16#include "port/lang.h"
7c673cae 17#include "port/likely.h"
11fdf7f2 18#include "util/core_local.h"
7c673cae
FG
19#include "util/mutexlock.h"
20#include "util/thread_local.h"
21
22// Only generate field unused warning for padding array, or build under
23// GCC 4.8.1 will fail.
24#ifdef __clang__
25#define ROCKSDB_FIELD_UNUSED __attribute__((__unused__))
26#else
27#define ROCKSDB_FIELD_UNUSED
28#endif // __clang__
29
f67539c2 30namespace ROCKSDB_NAMESPACE {
7c673cae
FG
31
32class Logger;
33
34// ConcurrentArena wraps an Arena. It makes it thread safe using a fast
35// inlined spinlock, and adds small per-core allocation caches to avoid
36// contention for small allocations. To avoid any memory waste from the
37// per-core shards, they are kept small, they are lazily instantiated
38// only if ConcurrentArena actually notices concurrent use, and they
39// adjust their size so that there is no fragmentation waste when the
40// shard blocks are allocated from the underlying main arena.
41class ConcurrentArena : public Allocator {
42 public:
43 // block_size and huge_page_size are the same as for Arena (and are
44 // in fact just passed to the constructor of arena_. The core-local
45 // shards compute their shard_block_size as a fraction of block_size
46 // that varies according to the hardware concurrency level.
47 explicit ConcurrentArena(size_t block_size = Arena::kMinBlockSize,
11fdf7f2 48 AllocTracker* tracker = nullptr,
7c673cae
FG
49 size_t huge_page_size = 0);
50
51 char* Allocate(size_t bytes) override {
52 return AllocateImpl(bytes, false /*force_arena*/,
20effc67 53 [this, bytes]() { return arena_.Allocate(bytes); });
7c673cae
FG
54 }
55
56 char* AllocateAligned(size_t bytes, size_t huge_page_size = 0,
57 Logger* logger = nullptr) override {
58 size_t rounded_up = ((bytes - 1) | (sizeof(void*) - 1)) + 1;
59 assert(rounded_up >= bytes && rounded_up < bytes + sizeof(void*) &&
60 (rounded_up % sizeof(void*)) == 0);
61
20effc67
TL
62 return AllocateImpl(rounded_up, huge_page_size != 0 /*force_arena*/,
63 [this, rounded_up, huge_page_size, logger]() {
64 return arena_.AllocateAligned(rounded_up,
65 huge_page_size, logger);
66 });
7c673cae
FG
67 }
68
69 size_t ApproximateMemoryUsage() const {
70 std::unique_lock<SpinMutex> lock(arena_mutex_, std::defer_lock);
11fdf7f2 71 lock.lock();
7c673cae
FG
72 return arena_.ApproximateMemoryUsage() - ShardAllocatedAndUnused();
73 }
74
75 size_t MemoryAllocatedBytes() const {
76 return memory_allocated_bytes_.load(std::memory_order_relaxed);
77 }
78
79 size_t AllocatedAndUnused() const {
80 return arena_allocated_and_unused_.load(std::memory_order_relaxed) +
81 ShardAllocatedAndUnused();
82 }
83
84 size_t IrregularBlockNum() const {
85 return irregular_block_num_.load(std::memory_order_relaxed);
86 }
87
88 size_t BlockSize() const override { return arena_.BlockSize(); }
89
90 private:
91 struct Shard {
92 char padding[40] ROCKSDB_FIELD_UNUSED;
93 mutable SpinMutex mutex;
94 char* free_begin_;
95 std::atomic<size_t> allocated_and_unused_;
96
11fdf7f2 97 Shard() : free_begin_(nullptr), allocated_and_unused_(0) {}
7c673cae
FG
98 };
99
100#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
11fdf7f2 101 static __thread size_t tls_cpuid;
7c673cae 102#else
11fdf7f2 103 enum ZeroFirstEnum : size_t { tls_cpuid = 0 };
7c673cae
FG
104#endif
105
106 char padding0[56] ROCKSDB_FIELD_UNUSED;
107
108 size_t shard_block_size_;
109
11fdf7f2 110 CoreLocalArray<Shard> shards_;
7c673cae
FG
111
112 Arena arena_;
113 mutable SpinMutex arena_mutex_;
114 std::atomic<size_t> arena_allocated_and_unused_;
115 std::atomic<size_t> memory_allocated_bytes_;
116 std::atomic<size_t> irregular_block_num_;
117
118 char padding1[56] ROCKSDB_FIELD_UNUSED;
119
120 Shard* Repick();
121
122 size_t ShardAllocatedAndUnused() const {
123 size_t total = 0;
11fdf7f2
TL
124 for (size_t i = 0; i < shards_.Size(); ++i) {
125 total += shards_.AccessAtCore(i)->allocated_and_unused_.load(
126 std::memory_order_relaxed);
7c673cae
FG
127 }
128 return total;
129 }
130
131 template <typename Func>
132 char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {
11fdf7f2 133 size_t cpu;
7c673cae
FG
134
135 // Go directly to the arena if the allocation is too large, or if
136 // we've never needed to Repick() and the arena mutex is available
137 // with no waiting. This keeps the fragmentation penalty of
138 // concurrency zero unless it might actually confer an advantage.
139 std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);
140 if (bytes > shard_block_size_ / 4 || force_arena ||
141 ((cpu = tls_cpuid) == 0 &&
11fdf7f2
TL
142 !shards_.AccessAtCore(0)->allocated_and_unused_.load(
143 std::memory_order_relaxed) &&
7c673cae
FG
144 arena_lock.try_lock())) {
145 if (!arena_lock.owns_lock()) {
146 arena_lock.lock();
147 }
148 auto rv = func();
149 Fixup();
150 return rv;
151 }
152
153 // pick a shard from which to allocate
11fdf7f2 154 Shard* s = shards_.AccessAtCore(cpu & (shards_.Size() - 1));
7c673cae
FG
155 if (!s->mutex.try_lock()) {
156 s = Repick();
157 s->mutex.lock();
158 }
159 std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock);
160
161 size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed);
162 if (avail < bytes) {
163 // reload
164 std::lock_guard<SpinMutex> reload_lock(arena_mutex_);
165
166 // If the arena's current block is within a factor of 2 of the right
167 // size, we adjust our request to avoid arena waste.
168 auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed);
169 assert(exact == arena_.AllocatedAndUnused());
11fdf7f2
TL
170
171 if (exact >= bytes && arena_.IsInInlineBlock()) {
172 // If we haven't exhausted arena's inline block yet, allocate from arena
173 // directly. This ensures that we'll do the first few small allocations
174 // without allocating any blocks.
175 // In particular this prevents empty memtables from using
176 // disproportionately large amount of memory: a memtable allocates on
177 // the order of 1 KB of memory when created; we wouldn't want to
178 // allocate a full arena block (typically a few megabytes) for that,
179 // especially if there are thousands of empty memtables.
180 auto rv = func();
181 Fixup();
182 return rv;
183 }
184
7c673cae
FG
185 avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2
186 ? exact
187 : shard_block_size_;
188 s->free_begin_ = arena_.AllocateAligned(avail);
189 Fixup();
190 }
191 s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed);
192
193 char* rv;
194 if ((bytes % sizeof(void*)) == 0) {
195 // aligned allocation from the beginning
196 rv = s->free_begin_;
197 s->free_begin_ += bytes;
198 } else {
199 // unaligned from the end
200 rv = s->free_begin_ + avail - bytes;
201 }
202 return rv;
203 }
204
205 void Fixup() {
206 arena_allocated_and_unused_.store(arena_.AllocatedAndUnused(),
207 std::memory_order_relaxed);
208 memory_allocated_bytes_.store(arena_.MemoryAllocatedBytes(),
209 std::memory_order_relaxed);
210 irregular_block_num_.store(arena_.IrregularBlockNum(),
211 std::memory_order_relaxed);
212 }
213
214 ConcurrentArena(const ConcurrentArena&) = delete;
215 ConcurrentArena& operator=(const ConcurrentArena&) = delete;
216};
217
f67539c2 218} // namespace ROCKSDB_NAMESPACE