2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package io
.netty
.buffer
;
20 import static org
.apache
.arrow
.memory
.util
.AssertionUtil
.ASSERT_ENABLED
;
22 import java
.lang
.reflect
.Field
;
23 import java
.nio
.ByteBuffer
;
24 import java
.util
.concurrent
.atomic
.AtomicLong
;
26 import org
.apache
.arrow
.memory
.OutOfMemoryException
;
27 import org
.apache
.arrow
.memory
.util
.LargeMemoryUtil
;
29 import io
.netty
.util
.internal
.OutOfDirectMemoryError
;
30 import io
.netty
.util
.internal
.StringUtil
;
33 * The base allocator that we use for all of Arrow's memory management. Returns
34 * UnsafeDirectLittleEndian buffers.
36 public class PooledByteBufAllocatorL
{
38 private static final org
.slf4j
.Logger memoryLogger
= org
.slf4j
.LoggerFactory
.getLogger("arrow.allocator");
40 private static final int MEMORY_LOGGER_FREQUENCY_SECONDS
= 60;
41 public final UnsafeDirectLittleEndian empty
;
42 private final AtomicLong hugeBufferSize
= new AtomicLong(0);
43 private final AtomicLong hugeBufferCount
= new AtomicLong(0);
44 private final AtomicLong normalBufferSize
= new AtomicLong(0);
45 private final AtomicLong normalBufferCount
= new AtomicLong(0);
46 private final InnerAllocator allocator
;
48 public PooledByteBufAllocatorL() {
49 allocator
= new InnerAllocator();
50 empty
= new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled
.EMPTY_BUFFER
));
54 * Returns a {@linkplain io.netty.buffer.UnsafeDirectLittleEndian} of the given size.
56 public UnsafeDirectLittleEndian
allocate(long size
) {
58 return allocator
.directBuffer(LargeMemoryUtil
.checkedCastToInt(size
), Integer
.MAX_VALUE
);
59 } catch (OutOfMemoryError e
) {
61 * OutOfDirectMemoryError is thrown by Netty when we exceed the direct memory limit defined by
62 * -XX:MaxDirectMemorySize. OutOfMemoryError with "Direct buffer memory" message is thrown by
63 * java.nio.Bits when we exceed the direct memory limit. This should never be hit in practice
64 * as Netty is expected to throw an OutOfDirectMemoryError first.
66 if (e
instanceof OutOfDirectMemoryError
|| "Direct buffer memory".equals(e
.getMessage())) {
67 throw new OutOfMemoryException("Failure allocating buffer.", e
);
73 public int getChunkSize() {
74 return allocator
.chunkSize
;
77 public long getHugeBufferSize() {
78 return hugeBufferSize
.get();
81 public long getHugeBufferCount() {
82 return hugeBufferCount
.get();
85 public long getNormalBufferSize() {
86 return normalBufferSize
.get();
89 public long getNormalBufferCount() {
90 return normalBufferSize
.get();
93 private static class AccountedUnsafeDirectLittleEndian
extends UnsafeDirectLittleEndian
{
95 private final long initialCapacity
;
96 private final AtomicLong count
;
97 private final AtomicLong size
;
99 private AccountedUnsafeDirectLittleEndian(LargeBuffer buf
, AtomicLong count
, AtomicLong size
) {
101 this.initialCapacity
= buf
.capacity();
106 private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf
, AtomicLong count
,
109 this.initialCapacity
= buf
.capacity();
115 public ByteBuf
copy() {
116 throw new UnsupportedOperationException("copy method is not supported");
120 public ByteBuf
copy(int index
, int length
) {
121 throw new UnsupportedOperationException("copy method is not supported");
125 public boolean release(int decrement
) {
126 boolean released
= super.release(decrement
);
128 count
.decrementAndGet();
129 size
.addAndGet(-initialCapacity
);
136 private class InnerAllocator
extends PooledByteBufAllocator
{
138 private final PoolArena
<ByteBuffer
>[] directArenas
;
139 private final MemoryStatusThread statusThread
;
140 private final int chunkSize
;
142 public InnerAllocator() {
146 Field f
= PooledByteBufAllocator
.class.getDeclaredField("directArenas");
147 f
.setAccessible(true);
148 this.directArenas
= (PoolArena
<ByteBuffer
>[]) f
.get(this);
149 } catch (Exception e
) {
150 throw new RuntimeException("Failure while initializing allocator. Unable to retrieve direct arenas field.", e
);
153 this.chunkSize
= directArenas
[0].chunkSize
;
155 if (memoryLogger
.isTraceEnabled()) {
156 statusThread
= new MemoryStatusThread();
157 statusThread
.start();
163 private UnsafeDirectLittleEndian
newDirectBufferL(int initialCapacity
, int maxCapacity
) {
164 PoolThreadCache cache
= threadCache();
165 PoolArena
<ByteBuffer
> directArena
= cache
.directArena
;
167 if (directArena
!= null) {
169 if (initialCapacity
> directArena
.chunkSize
) {
170 // This is beyond chunk size so we'll allocate separately.
171 ByteBuf buf
= UnpooledByteBufAllocator
.DEFAULT
.directBuffer(initialCapacity
, maxCapacity
);
173 hugeBufferSize
.addAndGet(buf
.capacity());
174 hugeBufferCount
.incrementAndGet();
176 // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
177 return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf
), hugeBufferCount
,
180 // within chunk, use arena.
181 ByteBuf buf
= directArena
.allocate(cache
, initialCapacity
, maxCapacity
);
182 if (!(buf
instanceof PooledUnsafeDirectByteBuf
)) {
186 if (!ASSERT_ENABLED
) {
187 return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf
) buf
);
190 normalBufferSize
.addAndGet(buf
.capacity());
191 normalBufferCount
.incrementAndGet();
193 return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf
) buf
,
194 normalBufferCount
, normalBufferSize
);
202 private UnsupportedOperationException
fail() {
203 return new UnsupportedOperationException(
204 "Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform " +
205 "didn't provide that functionality.");
209 public UnsafeDirectLittleEndian
directBuffer(int initialCapacity
, int maxCapacity
) {
210 if (initialCapacity
== 0 && maxCapacity
== 0) {
211 newDirectBuffer(initialCapacity
, maxCapacity
);
213 validate(initialCapacity
, maxCapacity
);
214 return newDirectBufferL(initialCapacity
, maxCapacity
);
218 public ByteBuf
heapBuffer(int initialCapacity
, int maxCapacity
) {
219 throw new UnsupportedOperationException("Arrow doesn't support using heap buffers.");
223 private void validate(int initialCapacity
, int maxCapacity
) {
224 if (initialCapacity
< 0) {
225 throw new IllegalArgumentException("initialCapacity: " + initialCapacity
+ " (expected: 0+)");
227 if (initialCapacity
> maxCapacity
) {
228 throw new IllegalArgumentException(String
.format(
229 "initialCapacity: %d (expected: not greater than maxCapacity(%d)",
230 initialCapacity
, maxCapacity
));
235 public String
toString() {
236 StringBuilder buf
= new StringBuilder();
237 buf
.append(directArenas
.length
);
238 buf
.append(" direct arena(s):");
239 buf
.append(StringUtil
.NEWLINE
);
240 for (PoolArena
<ByteBuffer
> a
: directArenas
) {
244 buf
.append("Large buffers outstanding: ");
245 buf
.append(hugeBufferCount
.get());
246 buf
.append(" totaling ");
247 buf
.append(hugeBufferSize
.get());
248 buf
.append(" bytes.");
250 buf
.append("Normal buffers outstanding: ");
251 buf
.append(normalBufferCount
.get());
252 buf
.append(" totaling ");
253 buf
.append(normalBufferSize
.get());
254 buf
.append(" bytes.");
255 return buf
.toString();
258 private class MemoryStatusThread
extends Thread
{
260 public MemoryStatusThread() {
261 super("allocation.logger");
262 this.setDaemon(true);
268 memoryLogger
.trace("Memory Usage: \n{}", PooledByteBufAllocatorL
.this.toString());
270 Thread
.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS
* 1000);
271 } catch (InterruptedException e
) {