1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
14 #include "port/likely.h"
15 #include "util/allocator.h"
16 #include "util/arena.h"
17 #include "util/mutexlock.h"
18 #include "util/thread_local.h"
20 // Only generate field unused warning for padding array, or build under
21 // GCC 4.8.1 will fail.
23 #define ROCKSDB_FIELD_UNUSED __attribute__((__unused__))
25 #define ROCKSDB_FIELD_UNUSED
32 // ConcurrentArena wraps an Arena. It makes it thread safe using a fast
33 // inlined spinlock, and adds small per-core allocation caches to avoid
34 // contention for small allocations. To avoid any memory waste from the
35 // per-core shards, they are kept small, they are lazily instantiated
36 // only if ConcurrentArena actually notices concurrent use, and they
37 // adjust their size so that there is no fragmentation waste when the
38 // shard blocks are allocated from the underlying main arena.
39 class ConcurrentArena
: public Allocator
{
41 // block_size and huge_page_size are the same as for Arena (and are
42 // in fact just passed to the constructor of arena_. The core-local
43 // shards compute their shard_block_size as a fraction of block_size
44 // that varies according to the hardware concurrency level.
45 explicit ConcurrentArena(size_t block_size
= Arena::kMinBlockSize
,
46 size_t huge_page_size
= 0);
48 char* Allocate(size_t bytes
) override
{
49 return AllocateImpl(bytes
, false /*force_arena*/,
50 [=]() { return arena_
.Allocate(bytes
); });
53 char* AllocateAligned(size_t bytes
, size_t huge_page_size
= 0,
54 Logger
* logger
= nullptr) override
{
55 size_t rounded_up
= ((bytes
- 1) | (sizeof(void*) - 1)) + 1;
56 assert(rounded_up
>= bytes
&& rounded_up
< bytes
+ sizeof(void*) &&
57 (rounded_up
% sizeof(void*)) == 0);
59 return AllocateImpl(rounded_up
, huge_page_size
!= 0 /*force_arena*/, [=]() {
60 return arena_
.AllocateAligned(rounded_up
, huge_page_size
, logger
);
64 size_t ApproximateMemoryUsage() const {
65 std::unique_lock
<SpinMutex
> lock(arena_mutex_
, std::defer_lock
);
66 if (index_mask_
!= 0) {
69 return arena_
.ApproximateMemoryUsage() - ShardAllocatedAndUnused();
72 size_t MemoryAllocatedBytes() const {
73 return memory_allocated_bytes_
.load(std::memory_order_relaxed
);
76 size_t AllocatedAndUnused() const {
77 return arena_allocated_and_unused_
.load(std::memory_order_relaxed
) +
78 ShardAllocatedAndUnused();
81 size_t IrregularBlockNum() const {
82 return irregular_block_num_
.load(std::memory_order_relaxed
);
85 size_t BlockSize() const override
{ return arena_
.BlockSize(); }
89 char padding
[40] ROCKSDB_FIELD_UNUSED
;
90 mutable SpinMutex mutex
;
92 std::atomic
<size_t> allocated_and_unused_
;
94 Shard() : allocated_and_unused_(0) {}
97 #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
98 static __thread
uint32_t tls_cpuid
;
100 enum ZeroFirstEnum
: uint32_t { tls_cpuid
= 0 };
103 char padding0
[56] ROCKSDB_FIELD_UNUSED
;
105 size_t shard_block_size_
;
107 // shards_[i & index_mask_] is valid
109 std::unique_ptr
<Shard
[]> shards_
;
112 mutable SpinMutex arena_mutex_
;
113 std::atomic
<size_t> arena_allocated_and_unused_
;
114 std::atomic
<size_t> memory_allocated_bytes_
;
115 std::atomic
<size_t> irregular_block_num_
;
117 char padding1
[56] ROCKSDB_FIELD_UNUSED
;
121 size_t ShardAllocatedAndUnused() const {
123 for (size_t i
= 0; i
<= index_mask_
; ++i
) {
124 total
+= shards_
[i
].allocated_and_unused_
.load(std::memory_order_relaxed
);
129 template <typename Func
>
130 char* AllocateImpl(size_t bytes
, bool force_arena
, const Func
& func
) {
133 // Go directly to the arena if the allocation is too large, or if
134 // we've never needed to Repick() and the arena mutex is available
135 // with no waiting. This keeps the fragmentation penalty of
136 // concurrency zero unless it might actually confer an advantage.
137 std::unique_lock
<SpinMutex
> arena_lock(arena_mutex_
, std::defer_lock
);
138 if (bytes
> shard_block_size_
/ 4 || force_arena
||
139 ((cpu
= tls_cpuid
) == 0 &&
140 !shards_
[0].allocated_and_unused_
.load(std::memory_order_relaxed
) &&
141 arena_lock
.try_lock())) {
142 if (!arena_lock
.owns_lock()) {
150 // pick a shard from which to allocate
151 Shard
* s
= &shards_
[cpu
& index_mask_
];
152 if (!s
->mutex
.try_lock()) {
156 std::unique_lock
<SpinMutex
> lock(s
->mutex
, std::adopt_lock
);
158 size_t avail
= s
->allocated_and_unused_
.load(std::memory_order_relaxed
);
161 std::lock_guard
<SpinMutex
> reload_lock(arena_mutex_
);
163 // If the arena's current block is within a factor of 2 of the right
164 // size, we adjust our request to avoid arena waste.
165 auto exact
= arena_allocated_and_unused_
.load(std::memory_order_relaxed
);
166 assert(exact
== arena_
.AllocatedAndUnused());
167 avail
= exact
>= shard_block_size_
/ 2 && exact
< shard_block_size_
* 2
170 s
->free_begin_
= arena_
.AllocateAligned(avail
);
173 s
->allocated_and_unused_
.store(avail
- bytes
, std::memory_order_relaxed
);
176 if ((bytes
% sizeof(void*)) == 0) {
177 // aligned allocation from the beginning
179 s
->free_begin_
+= bytes
;
181 // unaligned from the end
182 rv
= s
->free_begin_
+ avail
- bytes
;
188 arena_allocated_and_unused_
.store(arena_
.AllocatedAndUnused(),
189 std::memory_order_relaxed
);
190 memory_allocated_bytes_
.store(arena_
.MemoryAllocatedBytes(),
191 std::memory_order_relaxed
);
192 irregular_block_num_
.store(arena_
.IrregularBlockNum(),
193 std::memory_order_relaxed
);
196 ConcurrentArena(const ConcurrentArena
&) = delete;
197 ConcurrentArena
& operator=(const ConcurrentArena
&) = delete;
200 } // namespace rocksdb