]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/java/memory/memory-netty/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / memory / memory-netty / src / main / java / io / netty / buffer / PooledByteBufAllocatorL.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package io.netty.buffer;
19
20 import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED;
21
22 import java.lang.reflect.Field;
23 import java.nio.ByteBuffer;
24 import java.util.concurrent.atomic.AtomicLong;
25
26 import org.apache.arrow.memory.OutOfMemoryException;
27 import org.apache.arrow.memory.util.LargeMemoryUtil;
28
29 import io.netty.util.internal.OutOfDirectMemoryError;
30 import io.netty.util.internal.StringUtil;
31
32 /**
33 * The base allocator that we use for all of Arrow's memory management. Returns
34 * UnsafeDirectLittleEndian buffers.
35 */
36 public class PooledByteBufAllocatorL {
37
38 private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow.allocator");
39
40 private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
41 public final UnsafeDirectLittleEndian empty;
42 private final AtomicLong hugeBufferSize = new AtomicLong(0);
43 private final AtomicLong hugeBufferCount = new AtomicLong(0);
44 private final AtomicLong normalBufferSize = new AtomicLong(0);
45 private final AtomicLong normalBufferCount = new AtomicLong(0);
46 private final InnerAllocator allocator;
47
48 public PooledByteBufAllocatorL() {
49 allocator = new InnerAllocator();
50 empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER));
51 }
52
53 /**
54 * Returns a {@linkplain io.netty.buffer.UnsafeDirectLittleEndian} of the given size.
55 */
56 public UnsafeDirectLittleEndian allocate(long size) {
57 try {
58 return allocator.directBuffer(LargeMemoryUtil.checkedCastToInt(size), Integer.MAX_VALUE);
59 } catch (OutOfMemoryError e) {
60 /*
61 * OutOfDirectMemoryError is thrown by Netty when we exceed the direct memory limit defined by
62 * -XX:MaxDirectMemorySize. OutOfMemoryError with "Direct buffer memory" message is thrown by
63 * java.nio.Bits when we exceed the direct memory limit. This should never be hit in practice
64 * as Netty is expected to throw an OutOfDirectMemoryError first.
65 */
66 if (e instanceof OutOfDirectMemoryError || "Direct buffer memory".equals(e.getMessage())) {
67 throw new OutOfMemoryException("Failure allocating buffer.", e);
68 }
69 throw e;
70 }
71 }
72
73 public int getChunkSize() {
74 return allocator.chunkSize;
75 }
76
77 public long getHugeBufferSize() {
78 return hugeBufferSize.get();
79 }
80
81 public long getHugeBufferCount() {
82 return hugeBufferCount.get();
83 }
84
85 public long getNormalBufferSize() {
86 return normalBufferSize.get();
87 }
88
89 public long getNormalBufferCount() {
90 return normalBufferSize.get();
91 }
92
93 private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian {
94
95 private final long initialCapacity;
96 private final AtomicLong count;
97 private final AtomicLong size;
98
99 private AccountedUnsafeDirectLittleEndian(LargeBuffer buf, AtomicLong count, AtomicLong size) {
100 super(buf);
101 this.initialCapacity = buf.capacity();
102 this.count = count;
103 this.size = size;
104 }
105
106 private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count,
107 AtomicLong size) {
108 super(buf);
109 this.initialCapacity = buf.capacity();
110 this.count = count;
111 this.size = size;
112 }
113
114 @Override
115 public ByteBuf copy() {
116 throw new UnsupportedOperationException("copy method is not supported");
117 }
118
119 @Override
120 public ByteBuf copy(int index, int length) {
121 throw new UnsupportedOperationException("copy method is not supported");
122 }
123
124 @Override
125 public boolean release(int decrement) {
126 boolean released = super.release(decrement);
127 if (released) {
128 count.decrementAndGet();
129 size.addAndGet(-initialCapacity);
130 }
131 return released;
132 }
133
134 }
135
136 private class InnerAllocator extends PooledByteBufAllocator {
137
138 private final PoolArena<ByteBuffer>[] directArenas;
139 private final MemoryStatusThread statusThread;
140 private final int chunkSize;
141
142 public InnerAllocator() {
143 super(true);
144
145 try {
146 Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas");
147 f.setAccessible(true);
148 this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this);
149 } catch (Exception e) {
150 throw new RuntimeException("Failure while initializing allocator. Unable to retrieve direct arenas field.", e);
151 }
152
153 this.chunkSize = directArenas[0].chunkSize;
154
155 if (memoryLogger.isTraceEnabled()) {
156 statusThread = new MemoryStatusThread();
157 statusThread.start();
158 } else {
159 statusThread = null;
160 }
161 }
162
163 private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCapacity) {
164 PoolThreadCache cache = threadCache();
165 PoolArena<ByteBuffer> directArena = cache.directArena;
166
167 if (directArena != null) {
168
169 if (initialCapacity > directArena.chunkSize) {
170 // This is beyond chunk size so we'll allocate separately.
171 ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity);
172
173 hugeBufferSize.addAndGet(buf.capacity());
174 hugeBufferCount.incrementAndGet();
175
176 // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
177 return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount,
178 hugeBufferSize);
179 } else {
180 // within chunk, use arena.
181 ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity);
182 if (!(buf instanceof PooledUnsafeDirectByteBuf)) {
183 fail();
184 }
185
186 if (!ASSERT_ENABLED) {
187 return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf);
188 }
189
190 normalBufferSize.addAndGet(buf.capacity());
191 normalBufferCount.incrementAndGet();
192
193 return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf,
194 normalBufferCount, normalBufferSize);
195 }
196
197 } else {
198 throw fail();
199 }
200 }
201
202 private UnsupportedOperationException fail() {
203 return new UnsupportedOperationException(
204 "Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform " +
205 "didn't provide that functionality.");
206 }
207
208 @Override
209 public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) {
210 if (initialCapacity == 0 && maxCapacity == 0) {
211 newDirectBuffer(initialCapacity, maxCapacity);
212 }
213 validate(initialCapacity, maxCapacity);
214 return newDirectBufferL(initialCapacity, maxCapacity);
215 }
216
217 @Override
218 public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
219 throw new UnsupportedOperationException("Arrow doesn't support using heap buffers.");
220 }
221
222
223 private void validate(int initialCapacity, int maxCapacity) {
224 if (initialCapacity < 0) {
225 throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expected: 0+)");
226 }
227 if (initialCapacity > maxCapacity) {
228 throw new IllegalArgumentException(String.format(
229 "initialCapacity: %d (expected: not greater than maxCapacity(%d)",
230 initialCapacity, maxCapacity));
231 }
232 }
233
234 @Override
235 public String toString() {
236 StringBuilder buf = new StringBuilder();
237 buf.append(directArenas.length);
238 buf.append(" direct arena(s):");
239 buf.append(StringUtil.NEWLINE);
240 for (PoolArena<ByteBuffer> a : directArenas) {
241 buf.append(a);
242 }
243
244 buf.append("Large buffers outstanding: ");
245 buf.append(hugeBufferCount.get());
246 buf.append(" totaling ");
247 buf.append(hugeBufferSize.get());
248 buf.append(" bytes.");
249 buf.append('\n');
250 buf.append("Normal buffers outstanding: ");
251 buf.append(normalBufferCount.get());
252 buf.append(" totaling ");
253 buf.append(normalBufferSize.get());
254 buf.append(" bytes.");
255 return buf.toString();
256 }
257
258 private class MemoryStatusThread extends Thread {
259
260 public MemoryStatusThread() {
261 super("allocation.logger");
262 this.setDaemon(true);
263 }
264
265 @Override
266 public void run() {
267 while (true) {
268 memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString());
269 try {
270 Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
271 } catch (InterruptedException e) {
272 return;
273 }
274 }
275 }
276 }
277
278
279 }
280 }