]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / memory / memory-core / src / main / java / org / apache / arrow / memory / BaseAllocator.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.arrow.memory;
19
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.HashSet;
23 import java.util.IdentityHashMap;
24 import java.util.Map;
25 import java.util.Set;
26
27 import org.apache.arrow.memory.rounding.DefaultRoundingPolicy;
28 import org.apache.arrow.memory.rounding.RoundingPolicy;
29 import org.apache.arrow.memory.util.AssertionUtil;
30 import org.apache.arrow.memory.util.CommonUtil;
31 import org.apache.arrow.memory.util.HistoricalLog;
32 import org.apache.arrow.util.Preconditions;
33 import org.immutables.value.Value;
34
35 /**
36 * A base-class that implements all functionality of {@linkplain BufferAllocator}s.
37 *
38 * <p>The class is abstract to enforce usage of {@linkplain RootAllocator}/{@linkplain ChildAllocator}
39 * facades.
40 */
41 abstract class BaseAllocator extends Accountant implements BufferAllocator {
42
43 public static final String DEBUG_ALLOCATOR = "arrow.memory.debug.allocator";
44 public static final int DEBUG_LOG_LENGTH = 6;
45 public static final boolean DEBUG;
46 private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseAllocator.class);
47
48 // Initialize this before DEFAULT_CONFIG as DEFAULT_CONFIG will eventually initialize the allocation manager,
49 // which in turn allocates an ArrowBuf, which requires DEBUG to have been properly initialized
50 static {
51 // the system property takes precedence.
52 String propValue = System.getProperty(DEBUG_ALLOCATOR);
53 if (propValue != null) {
54 DEBUG = Boolean.parseBoolean(propValue);
55 } else {
56 DEBUG = AssertionUtil.isAssertionsEnabled();
57 }
58 logger.info("Debug mode " + (DEBUG ? "enabled." : "disabled."));
59 }
60
61 public static final Config DEFAULT_CONFIG = ImmutableConfig.builder().build();
62
63 // Package exposed for sharing between AllocatorManger and BaseAllocator objects
64 private final String name;
65 private final RootAllocator root;
66 private final Object DEBUG_LOCK = DEBUG ? new Object() : null;
67 private final AllocationListener listener;
68 private final BaseAllocator parentAllocator;
69 private final Map<BaseAllocator, Object> childAllocators;
70 private final ArrowBuf empty;
71 // members used purely for debugging
72 private final IdentityHashMap<BufferLedger, Object> childLedgers;
73 private final IdentityHashMap<Reservation, Object> reservations;
74 private final HistoricalLog historicalLog;
75 private final RoundingPolicy roundingPolicy;
76 private final AllocationManager.Factory allocationManagerFactory;
77
78 private volatile boolean isClosed = false; // the allocator has been closed
79
80 /**
81 * Initialize an allocator.
82 *
83 * @param parentAllocator parent allocator. null if defining a root allocator
84 * @param name name of this allocator
85 * @param config configuration including other options of this allocator
86 *
87 * @see Config
88 */
89 protected BaseAllocator(
90 final BaseAllocator parentAllocator,
91 final String name,
92 final Config config) throws OutOfMemoryException {
93 super(parentAllocator, name, config.getInitReservation(), config.getMaxAllocation());
94
95 this.listener = config.getListener();
96 this.allocationManagerFactory = config.getAllocationManagerFactory();
97
98 if (parentAllocator != null) {
99 this.root = parentAllocator.root;
100 empty = parentAllocator.empty;
101 } else if (this instanceof RootAllocator) {
102 this.root = (RootAllocator) this;
103 empty = createEmpty();
104 } else {
105 throw new IllegalStateException("An parent allocator must either carry a root or be the " +
106 "root.");
107 }
108
109 this.parentAllocator = parentAllocator;
110 this.name = name;
111
112 this.childAllocators = Collections.synchronizedMap(new IdentityHashMap<>());
113
114 if (DEBUG) {
115 reservations = new IdentityHashMap<>();
116 childLedgers = new IdentityHashMap<>();
117 historicalLog = new HistoricalLog(DEBUG_LOG_LENGTH, "allocator[%s]", name);
118 hist("created by \"%s\", owned = %d", name, this.getAllocatedMemory());
119 } else {
120 reservations = null;
121 historicalLog = null;
122 childLedgers = null;
123 }
124 this.roundingPolicy = config.getRoundingPolicy();
125 }
126
127 @Override
128 public AllocationListener getListener() {
129 return listener;
130 }
131
132 @Override
133 public BaseAllocator getParentAllocator() {
134 return parentAllocator;
135 }
136
137 @Override
138 public Collection<BufferAllocator> getChildAllocators() {
139 synchronized (childAllocators) {
140 return new HashSet<>(childAllocators.keySet());
141 }
142 }
143
144 private static String createErrorMsg(final BufferAllocator allocator, final long rounded, final long requested) {
145 if (rounded != requested) {
146 return String.format(
147 "Unable to allocate buffer of size %d (rounded from %d) due to memory limit. Current " +
148 "allocation: %d", rounded, requested, allocator.getAllocatedMemory());
149 } else {
150 return String.format(
151 "Unable to allocate buffer of size %d due to memory limit. Current " +
152 "allocation: %d", rounded, allocator.getAllocatedMemory());
153 }
154 }
155
156 public static boolean isDebug() {
157 return DEBUG;
158 }
159
160 @Override
161 public void assertOpen() {
162 if (AssertionUtil.ASSERT_ENABLED) {
163 if (isClosed) {
164 throw new IllegalStateException("Attempting operation on allocator when allocator is closed.\n" +
165 toVerboseString());
166 }
167 }
168 }
169
170 @Override
171 public String getName() {
172 return name;
173 }
174
175 @Override
176 public ArrowBuf getEmpty() {
177 return empty;
178 }
179
180 /**
181 * For debug/verification purposes only. Allows an AllocationManager to tell the allocator that
182 * we have a new ledger
183 * associated with this allocator.
184 */
185 void associateLedger(BufferLedger ledger) {
186 assertOpen();
187 if (DEBUG) {
188 synchronized (DEBUG_LOCK) {
189 childLedgers.put(ledger, null);
190 }
191 }
192 }
193
194 /**
195 * For debug/verification purposes only. Allows an AllocationManager to tell the allocator that
196 * we are removing a
197 * ledger associated with this allocator
198 */
199 void dissociateLedger(BufferLedger ledger) {
200 assertOpen();
201 if (DEBUG) {
202 synchronized (DEBUG_LOCK) {
203 if (!childLedgers.containsKey(ledger)) {
204 throw new IllegalStateException("Trying to remove a child ledger that doesn't exist.");
205 }
206 childLedgers.remove(ledger);
207 }
208 }
209 }
210
211 /**
212 * Track when a ChildAllocator of this BaseAllocator is closed. Used for debugging purposes.
213 *
214 * @param childAllocator The child allocator that has been closed.
215 */
216 private void childClosed(final BaseAllocator childAllocator) {
217 assertOpen();
218
219 if (DEBUG) {
220 Preconditions.checkArgument(childAllocator != null, "child allocator can't be null");
221
222 synchronized (DEBUG_LOCK) {
223 final Object object = childAllocators.remove(childAllocator);
224 if (object == null) {
225 childAllocator.historicalLog.logHistory(logger);
226 throw new IllegalStateException("Child allocator[" + childAllocator.name +
227 "] not found in parent allocator[" + name + "]'s childAllocators");
228 }
229 }
230 } else {
231 childAllocators.remove(childAllocator);
232 }
233 listener.onChildRemoved(this, childAllocator);
234 }
235
236 @Override
237 public ArrowBuf buffer(final long initialRequestSize) {
238 assertOpen();
239
240 return buffer(initialRequestSize, null);
241 }
242
243 private ArrowBuf createEmpty() {
244 return allocationManagerFactory.empty();
245 }
246
247 @Override
248 public ArrowBuf buffer(final long initialRequestSize, BufferManager manager) {
249 assertOpen();
250
251 Preconditions.checkArgument(initialRequestSize >= 0, "the requested size must be non-negative");
252
253 if (initialRequestSize == 0) {
254 return getEmpty();
255 }
256
257 // round the request size according to the rounding policy
258 final long actualRequestSize = roundingPolicy.getRoundedSize(initialRequestSize);
259
260 listener.onPreAllocation(actualRequestSize);
261
262 AllocationOutcome outcome = this.allocateBytes(actualRequestSize);
263 if (!outcome.isOk()) {
264 if (listener.onFailedAllocation(actualRequestSize, outcome)) {
265 // Second try, in case the listener can do something about it
266 outcome = this.allocateBytes(actualRequestSize);
267 }
268 if (!outcome.isOk()) {
269 throw new OutOfMemoryException(createErrorMsg(this, actualRequestSize,
270 initialRequestSize), outcome.getDetails());
271 }
272 }
273
274 boolean success = false;
275 try {
276 ArrowBuf buffer = bufferWithoutReservation(actualRequestSize, manager);
277 success = true;
278 listener.onAllocation(actualRequestSize);
279 return buffer;
280 } catch (OutOfMemoryError e) {
281 throw e;
282 } finally {
283 if (!success) {
284 releaseBytes(actualRequestSize);
285 }
286 }
287 }
288
289 /**
290 * Used by usual allocation as well as for allocating a pre-reserved buffer.
291 * Skips the typical accounting associated with creating a new buffer.
292 */
293 private ArrowBuf bufferWithoutReservation(
294 final long size,
295 BufferManager bufferManager) throws OutOfMemoryException {
296 assertOpen();
297
298 final AllocationManager manager = newAllocationManager(size);
299 final BufferLedger ledger = manager.associate(this); // +1 ref cnt (required)
300 final ArrowBuf buffer = ledger.newArrowBuf(size, bufferManager);
301
302 // make sure that our allocation is equal to what we expected.
303 Preconditions.checkArgument(buffer.capacity() == size,
304 "Allocated capacity %d was not equal to requested capacity %d.", buffer.capacity(), size);
305
306 return buffer;
307 }
308
309 private AllocationManager newAllocationManager(long size) {
310 return newAllocationManager(this, size);
311 }
312
313
314 private AllocationManager newAllocationManager(BaseAllocator accountingAllocator, long size) {
315 return allocationManagerFactory.create(accountingAllocator, size);
316 }
317
318 @Override
319 public BufferAllocator getRoot() {
320 return root;
321 }
322
323 @Override
324 public BufferAllocator newChildAllocator(
325 final String name,
326 final long initReservation,
327 final long maxAllocation) {
328 return newChildAllocator(name, this.listener, initReservation, maxAllocation);
329 }
330
331 @Override
332 public BufferAllocator newChildAllocator(
333 final String name,
334 final AllocationListener listener,
335 final long initReservation,
336 final long maxAllocation) {
337 assertOpen();
338
339 final ChildAllocator childAllocator =
340 new ChildAllocator(this, name, configBuilder()
341 .listener(listener)
342 .initReservation(initReservation)
343 .maxAllocation(maxAllocation)
344 .roundingPolicy(roundingPolicy)
345 .allocationManagerFactory(allocationManagerFactory)
346 .build());
347
348 if (DEBUG) {
349 synchronized (DEBUG_LOCK) {
350 childAllocators.put(childAllocator, childAllocator);
351 historicalLog.recordEvent("allocator[%s] created new child allocator[%s]", name,
352 childAllocator.getName());
353 }
354 } else {
355 childAllocators.put(childAllocator, childAllocator);
356 }
357 this.listener.onChildAdded(this, childAllocator);
358
359 return childAllocator;
360 }
361
362 @Override
363 public AllocationReservation newReservation() {
364 assertOpen();
365
366 return new Reservation();
367 }
368
369 @Override
370 public synchronized void close() {
371 /*
372 * Some owners may close more than once because of complex cleanup and shutdown
373 * procedures.
374 */
375 if (isClosed) {
376 return;
377 }
378
379 isClosed = true;
380
381 StringBuilder outstandingChildAllocators = new StringBuilder();
382 if (DEBUG) {
383 synchronized (DEBUG_LOCK) {
384 verifyAllocator();
385
386 // are there outstanding child allocators?
387 if (!childAllocators.isEmpty()) {
388 for (final BaseAllocator childAllocator : childAllocators.keySet()) {
389 if (childAllocator.isClosed) {
390 logger.warn(String.format(
391 "Closed child allocator[%s] on parent allocator[%s]'s child list.\n%s",
392 childAllocator.name, name, toString()));
393 }
394 }
395
396 throw new IllegalStateException(
397 String.format("Allocator[%s] closed with outstanding child allocators.\n%s", name,
398 toString()));
399 }
400
401 // are there outstanding buffers?
402 final int allocatedCount = childLedgers.size();
403 if (allocatedCount > 0) {
404 throw new IllegalStateException(
405 String.format("Allocator[%s] closed with outstanding buffers allocated (%d).\n%s",
406 name, allocatedCount, toString()));
407 }
408
409 if (reservations.size() != 0) {
410 throw new IllegalStateException(
411 String.format("Allocator[%s] closed with outstanding reservations (%d).\n%s", name,
412 reservations.size(),
413 toString()));
414 }
415
416 }
417 } else {
418 if (!childAllocators.isEmpty()) {
419 outstandingChildAllocators.append("Outstanding child allocators : \n");
420 synchronized (childAllocators) {
421 for (final BaseAllocator childAllocator : childAllocators.keySet()) {
422 outstandingChildAllocators.append(String.format(" %s", childAllocator.toString()));
423 }
424 }
425 }
426 }
427
428 // Is there unaccounted-for outstanding allocation?
429 final long allocated = getAllocatedMemory();
430 if (allocated > 0) {
431 if (parent != null && reservation > allocated) {
432 parent.releaseBytes(reservation - allocated);
433 }
434 String msg = String.format("Memory was leaked by query. Memory leaked: (%d)\n%s%s", allocated,
435 outstandingChildAllocators.toString(), toString());
436 logger.error(msg);
437 throw new IllegalStateException(msg);
438 }
439
440 // we need to release our memory to our parent before we tell it we've closed.
441 super.close();
442
443 // Inform our parent allocator that we've closed
444 if (parentAllocator != null) {
445 parentAllocator.childClosed(this);
446 }
447
448 if (DEBUG) {
449 historicalLog.recordEvent("closed");
450 logger.debug(String.format("closed allocator[%s].", name));
451 }
452
453
454 }
455
456 @Override
457 public String toString() {
458 final Verbosity verbosity = logger.isTraceEnabled() ? Verbosity.LOG_WITH_STACKTRACE
459 : Verbosity.BASIC;
460 final StringBuilder sb = new StringBuilder();
461 print(sb, 0, verbosity);
462 return sb.toString();
463 }
464
465 /**
466 * Provide a verbose string of the current allocator state. Includes the state of all child
467 * allocators, along with
468 * historical logs of each object and including stacktraces.
469 *
470 * @return A Verbose string of current allocator state.
471 */
472 @Override
473 public String toVerboseString() {
474 final StringBuilder sb = new StringBuilder();
475 print(sb, 0, Verbosity.LOG_WITH_STACKTRACE);
476 return sb.toString();
477 }
478
479 private void hist(String noteFormat, Object... args) {
480 historicalLog.recordEvent(noteFormat, args);
481 }
482
483 /**
484 * Verifies the accounting state of the allocator. Only works for DEBUG.
485 *
486 * @throws IllegalStateException when any problems are found
487 */
488 void verifyAllocator() {
489 final IdentityHashMap<AllocationManager, BaseAllocator> seen = new IdentityHashMap<>();
490 verifyAllocator(seen);
491 }
492
493 /**
494 * Verifies the accounting state of the allocator (Only works for DEBUG)
495 * This overload is used for recursive calls, allowing for checking
496 * that ArrowBufs are unique across all allocators that are checked.
497 *
498 * @param buffersSeen a map of buffers that have already been seen when walking a tree of
499 * allocators
500 * @throws IllegalStateException when any problems are found
501 */
502 private void verifyAllocator(
503 final IdentityHashMap<AllocationManager, BaseAllocator> buffersSeen) {
504 // The remaining tests can only be performed if we're in debug mode.
505 if (!DEBUG) {
506 return;
507 }
508
509 synchronized (DEBUG_LOCK) {
510 final long allocated = getAllocatedMemory();
511
512 // verify my direct descendants
513 final Set<BaseAllocator> childSet = childAllocators.keySet();
514 for (final BaseAllocator childAllocator : childSet) {
515 childAllocator.verifyAllocator(buffersSeen);
516 }
517
518 /*
519 * Verify my relationships with my descendants.
520 *
521 * The sum of direct child allocators' owned memory must be <= my allocated memory; my
522 * allocated memory also
523 * includes ArrowBuf's directly allocated by me.
524 */
525 long childTotal = 0;
526 for (final BaseAllocator childAllocator : childSet) {
527 childTotal += Math.max(childAllocator.getAllocatedMemory(), childAllocator.reservation);
528 }
529 if (childTotal > getAllocatedMemory()) {
530 historicalLog.logHistory(logger);
531 logger.debug("allocator[" + name + "] child event logs BEGIN");
532 for (final BaseAllocator childAllocator : childSet) {
533 childAllocator.historicalLog.logHistory(logger);
534 }
535 logger.debug("allocator[" + name + "] child event logs END");
536 throw new IllegalStateException(
537 "Child allocators own more memory (" + childTotal + ") than their parent (name = " +
538 name + " ) has allocated (" + getAllocatedMemory() + ')');
539 }
540
541 // Furthermore, the amount I've allocated should be that plus buffers I've allocated.
542 long bufferTotal = 0;
543
544 final Set<BufferLedger> ledgerSet = childLedgers.keySet();
545 for (final BufferLedger ledger : ledgerSet) {
546 if (!ledger.isOwningLedger()) {
547 continue;
548 }
549
550 final AllocationManager am = ledger.getAllocationManager();
551 /*
552 * Even when shared, ArrowBufs are rewrapped, so we should never see the same instance
553 * twice.
554 */
555 final BaseAllocator otherOwner = buffersSeen.get(am);
556 if (otherOwner != null) {
557 throw new IllegalStateException("This allocator's ArrowBuf already owned by another " +
558 "allocator");
559 }
560 buffersSeen.put(am, this);
561
562 bufferTotal += am.getSize();
563 }
564
565 // Preallocated space has to be accounted for
566 final Set<Reservation> reservationSet = reservations.keySet();
567 long reservedTotal = 0;
568 for (final Reservation reservation : reservationSet) {
569 if (!reservation.isUsed()) {
570 reservedTotal += reservation.getSize();
571 }
572 }
573
574 if (bufferTotal + reservedTotal + childTotal != getAllocatedMemory()) {
575 final StringBuilder sb = new StringBuilder();
576 sb.append("allocator[");
577 sb.append(name);
578 sb.append("]\nallocated: ");
579 sb.append(Long.toString(allocated));
580 sb.append(" allocated - (bufferTotal + reservedTotal + childTotal): ");
581 sb.append(Long.toString(allocated - (bufferTotal + reservedTotal + childTotal)));
582 sb.append('\n');
583
584 if (bufferTotal != 0) {
585 sb.append("buffer total: ");
586 sb.append(Long.toString(bufferTotal));
587 sb.append('\n');
588 dumpBuffers(sb, ledgerSet);
589 }
590
591 if (childTotal != 0) {
592 sb.append("child total: ");
593 sb.append(Long.toString(childTotal));
594 sb.append('\n');
595
596 for (final BaseAllocator childAllocator : childSet) {
597 sb.append("child allocator[");
598 sb.append(childAllocator.name);
599 sb.append("] owned ");
600 sb.append(Long.toString(childAllocator.getAllocatedMemory()));
601 sb.append('\n');
602 }
603 }
604
605 if (reservedTotal != 0) {
606 sb.append(String.format("reserved total : %d bytes.", reservedTotal));
607 for (final Reservation reservation : reservationSet) {
608 reservation.historicalLog.buildHistory(sb, 0, true);
609 sb.append('\n');
610 }
611 }
612
613 logger.debug(sb.toString());
614
615 final long allocated2 = getAllocatedMemory();
616
617 if (allocated2 != allocated) {
618 throw new IllegalStateException(String.format(
619 "allocator[%s]: allocated t1 (%d) + allocated t2 (%d). Someone released memory while in verification.",
620 name, allocated, allocated2));
621
622 }
623 throw new IllegalStateException(String.format(
624 "allocator[%s]: buffer space (%d) + prealloc space (%d) + child space (%d) != allocated (%d)",
625 name, bufferTotal, reservedTotal, childTotal, allocated));
626 }
627 }
628 }
629
630 void print(StringBuilder sb, int level, Verbosity verbosity) {
631
632 CommonUtil.indent(sb, level)
633 .append("Allocator(")
634 .append(name)
635 .append(") ")
636 .append(reservation)
637 .append('/')
638 .append(getAllocatedMemory())
639 .append('/')
640 .append(getPeakMemoryAllocation())
641 .append('/')
642 .append(getLimit())
643 .append(" (res/actual/peak/limit)")
644 .append('\n');
645
646 if (DEBUG) {
647 CommonUtil.indent(sb, level + 1).append(String.format("child allocators: %d\n", childAllocators.size()));
648 for (BaseAllocator child : childAllocators.keySet()) {
649 child.print(sb, level + 2, verbosity);
650 }
651
652 CommonUtil.indent(sb, level + 1).append(String.format("ledgers: %d\n", childLedgers.size()));
653 for (BufferLedger ledger : childLedgers.keySet()) {
654 ledger.print(sb, level + 2, verbosity);
655 }
656
657 final Set<Reservation> reservations = this.reservations.keySet();
658 CommonUtil.indent(sb, level + 1).append(String.format("reservations: %d\n", reservations.size()));
659 for (final Reservation reservation : reservations) {
660 if (verbosity.includeHistoricalLog) {
661 reservation.historicalLog.buildHistory(sb, level + 3, true);
662 }
663 }
664
665 }
666
667 }
668
669 private void dumpBuffers(final StringBuilder sb, final Set<BufferLedger> ledgerSet) {
670 for (final BufferLedger ledger : ledgerSet) {
671 if (!ledger.isOwningLedger()) {
672 continue;
673 }
674 final AllocationManager am = ledger.getAllocationManager();
675 sb.append("UnsafeDirectLittleEndian[identityHashCode == ");
676 sb.append(Integer.toString(System.identityHashCode(am)));
677 sb.append("] size ");
678 sb.append(Long.toString(am.getSize()));
679 sb.append('\n');
680 }
681 }
682
683 /**
684 * Enum for logging verbosity.
685 */
686 public enum Verbosity {
687 BASIC(false, false), // only include basic information
688 LOG(true, false), // include basic
689 LOG_WITH_STACKTRACE(true, true) //
690 ;
691
692 public final boolean includeHistoricalLog;
693 public final boolean includeStackTraces;
694
695 Verbosity(boolean includeHistoricalLog, boolean includeStackTraces) {
696 this.includeHistoricalLog = includeHistoricalLog;
697 this.includeStackTraces = includeStackTraces;
698 }
699 }
700
701 /**
702 * Returns a default {@link Config} instance.
703 *
704 * @see ImmutableConfig.Builder
705 */
706 public static Config defaultConfig() {
707 return DEFAULT_CONFIG;
708
709 }
710
711 /**
712 * Returns a builder class for configuring BaseAllocator's options.
713 */
714 public static ImmutableConfig.Builder configBuilder() {
715 return ImmutableConfig.builder();
716 }
717
718 @Override
719 public RoundingPolicy getRoundingPolicy() {
720 return roundingPolicy;
721 }
722
723 /**
724 * Config class of {@link BaseAllocator}.
725 */
726 @Value.Immutable
727 abstract static class Config {
728 /**
729 * Factory for creating {@link AllocationManager} instances.
730 */
731 @Value.Default
732 AllocationManager.Factory getAllocationManagerFactory() {
733 return DefaultAllocationManagerOption.getDefaultAllocationManagerFactory();
734 }
735
736 /**
737 * Listener callback. Must be non-null.
738 */
739 @Value.Default
740 AllocationListener getListener() {
741 return AllocationListener.NOOP;
742 }
743
744 /**
745 * Initial reservation size (in bytes) for this allocator.
746 */
747 @Value.Default
748 long getInitReservation() {
749 return 0;
750 }
751
752 /**
753 * Max allocation size (in bytes) for this allocator, allocations past this limit fail.
754 * Can be modified after construction.
755 */
756 @Value.Default
757 long getMaxAllocation() {
758 return Long.MAX_VALUE;
759 }
760
761 /**
762 * The policy for rounding the buffer size.
763 */
764 @Value.Default
765 RoundingPolicy getRoundingPolicy() {
766 return DefaultRoundingPolicy.DEFAULT_ROUNDING_POLICY;
767 }
768 }
769
770 /**
771 * Implementation of {@link AllocationReservation} that supports
772 * history tracking under {@linkplain #DEBUG} is true.
773 */
774 public class Reservation implements AllocationReservation {
775
776 private final HistoricalLog historicalLog;
777 private int nBytes = 0;
778 private boolean used = false;
779 private boolean closed = false;
780
781 /**
782 * Creates a new reservation.
783 *
784 * <p>If {@linkplain #DEBUG} is true this will capture a historical
785 * log of events relevant to this Reservation.
786 */
787 public Reservation() {
788 if (DEBUG) {
789 historicalLog = new HistoricalLog("Reservation[allocator[%s], %d]", name, System
790 .identityHashCode(this));
791 historicalLog.recordEvent("created");
792 synchronized (DEBUG_LOCK) {
793 reservations.put(this, this);
794 }
795 } else {
796 historicalLog = null;
797 }
798 }
799
800 @Override
801 public boolean add(final int nBytes) {
802 assertOpen();
803
804 Preconditions.checkArgument(nBytes >= 0, "nBytes(%d) < 0", nBytes);
805 Preconditions.checkState(!closed, "Attempt to increase reservation after reservation has been closed");
806 Preconditions.checkState(!used, "Attempt to increase reservation after reservation has been used");
807
808 // we round up to next power of two since all reservations are done in powers of two. This
809 // may overestimate the
810 // preallocation since someone may perceive additions to be power of two. If this becomes a
811 // problem, we can look
812 // at
813 // modifying this behavior so that we maintain what we reserve and what the user asked for
814 // and make sure to only
815 // round to power of two as necessary.
816 final int nBytesTwo = CommonUtil.nextPowerOfTwo(nBytes);
817 if (!reserve(nBytesTwo)) {
818 return false;
819 }
820
821 this.nBytes += nBytesTwo;
822 return true;
823 }
824
825 @Override
826 public ArrowBuf allocateBuffer() {
827 assertOpen();
828
829 Preconditions.checkState(!closed, "Attempt to allocate after closed");
830 Preconditions.checkState(!used, "Attempt to allocate more than once");
831
832 final ArrowBuf arrowBuf = allocate(nBytes);
833 used = true;
834 return arrowBuf;
835 }
836
837 @Override
838 public int getSize() {
839 return nBytes;
840 }
841
842 @Override
843 public boolean isUsed() {
844 return used;
845 }
846
847 @Override
848 public boolean isClosed() {
849 return closed;
850 }
851
852 @Override
853 public void close() {
854 assertOpen();
855
856 if (closed) {
857 return;
858 }
859
860 if (DEBUG) {
861 if (!isClosed()) {
862 final Object object;
863 synchronized (DEBUG_LOCK) {
864 object = reservations.remove(this);
865 }
866 if (object == null) {
867 final StringBuilder sb = new StringBuilder();
868 print(sb, 0, Verbosity.LOG_WITH_STACKTRACE);
869 logger.debug(sb.toString());
870 throw new IllegalStateException(String.format("Didn't find closing reservation[%d]",
871 System.identityHashCode(this)));
872 }
873
874 historicalLog.recordEvent("closed");
875 }
876 }
877
878 if (!used) {
879 releaseReservation(nBytes);
880 }
881
882 closed = true;
883 }
884
885 @Override
886 public boolean reserve(int nBytes) {
887 assertOpen();
888
889 final AllocationOutcome outcome = BaseAllocator.this.allocateBytes(nBytes);
890
891 if (DEBUG) {
892 historicalLog.recordEvent("reserve(%d) => %s", nBytes, Boolean.toString(outcome.isOk()));
893 }
894
895 return outcome.isOk();
896 }
897
898 /**
899 * Allocate a buffer of the requested size.
900 *
901 * <p>The implementation of the allocator's inner class provides this.
902 *
903 * @param nBytes the size of the buffer requested
904 * @return the buffer, or null, if the request cannot be satisfied
905 */
906 private ArrowBuf allocate(int nBytes) {
907 assertOpen();
908
909 boolean success = false;
910
911 /*
912 * The reservation already added the requested bytes to the allocators owned and allocated
913 * bytes via reserve().
914 * This ensures that they can't go away. But when we ask for the buffer here, that will add
915 * to the allocated bytes
916 * as well, so we need to return the same number back to avoid double-counting them.
917 */
918 try {
919 final ArrowBuf arrowBuf = BaseAllocator.this.bufferWithoutReservation(nBytes, null);
920
921 listener.onAllocation(nBytes);
922 if (DEBUG) {
923 historicalLog.recordEvent("allocate() => %s", String.format("ArrowBuf[%d]", arrowBuf
924 .getId()));
925 }
926 success = true;
927 return arrowBuf;
928 } finally {
929 if (!success) {
930 releaseBytes(nBytes);
931 }
932 }
933 }
934
935 /**
936 * Return the reservation back to the allocator without having used it.
937 *
938 * @param nBytes the size of the reservation
939 */
940 private void releaseReservation(int nBytes) {
941 assertOpen();
942
943 releaseBytes(nBytes);
944
945 if (DEBUG) {
946 historicalLog.recordEvent("releaseReservation(%d)", nBytes);
947 }
948 }
949
950 }
951 }