]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one or more | |
3 | * contributor license agreements. See the NOTICE file distributed with | |
4 | * this work for additional information regarding copyright ownership. | |
5 | * The ASF licenses this file to You under the Apache License, Version 2.0 | |
6 | * (the "License"); you may not use this file except in compliance with | |
7 | * the License. You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | */ | |
17 | ||
18 | package org.apache.arrow.memory; | |
19 | ||
20 | import java.util.Collection; | |
21 | import java.util.Collections; | |
22 | import java.util.HashSet; | |
23 | import java.util.IdentityHashMap; | |
24 | import java.util.Map; | |
25 | import java.util.Set; | |
26 | ||
27 | import org.apache.arrow.memory.rounding.DefaultRoundingPolicy; | |
28 | import org.apache.arrow.memory.rounding.RoundingPolicy; | |
29 | import org.apache.arrow.memory.util.AssertionUtil; | |
30 | import org.apache.arrow.memory.util.CommonUtil; | |
31 | import org.apache.arrow.memory.util.HistoricalLog; | |
32 | import org.apache.arrow.util.Preconditions; | |
33 | import org.immutables.value.Value; | |
34 | ||
35 | /** | |
36 | * A base-class that implements all functionality of {@linkplain BufferAllocator}s. | |
37 | * | |
38 | * <p>The class is abstract to enforce usage of {@linkplain RootAllocator}/{@linkplain ChildAllocator} | |
39 | * facades. | |
40 | */ | |
41 | abstract class BaseAllocator extends Accountant implements BufferAllocator { | |
42 | ||
43 | public static final String DEBUG_ALLOCATOR = "arrow.memory.debug.allocator"; | |
44 | public static final int DEBUG_LOG_LENGTH = 6; | |
45 | public static final boolean DEBUG; | |
46 | private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseAllocator.class); | |
47 | ||
48 | // Initialize this before DEFAULT_CONFIG as DEFAULT_CONFIG will eventually initialize the allocation manager, | |
49 | // which in turn allocates an ArrowBuf, which requires DEBUG to have been properly initialized | |
50 | static { | |
51 | // the system property takes precedence. | |
52 | String propValue = System.getProperty(DEBUG_ALLOCATOR); | |
53 | if (propValue != null) { | |
54 | DEBUG = Boolean.parseBoolean(propValue); | |
55 | } else { | |
56 | DEBUG = AssertionUtil.isAssertionsEnabled(); | |
57 | } | |
58 | logger.info("Debug mode " + (DEBUG ? "enabled." : "disabled.")); | |
59 | } | |
60 | ||
61 | public static final Config DEFAULT_CONFIG = ImmutableConfig.builder().build(); | |
62 | ||
63 | // Package exposed for sharing between AllocatorManger and BaseAllocator objects | |
64 | private final String name; | |
65 | private final RootAllocator root; | |
66 | private final Object DEBUG_LOCK = DEBUG ? new Object() : null; | |
67 | private final AllocationListener listener; | |
68 | private final BaseAllocator parentAllocator; | |
69 | private final Map<BaseAllocator, Object> childAllocators; | |
70 | private final ArrowBuf empty; | |
71 | // members used purely for debugging | |
72 | private final IdentityHashMap<BufferLedger, Object> childLedgers; | |
73 | private final IdentityHashMap<Reservation, Object> reservations; | |
74 | private final HistoricalLog historicalLog; | |
75 | private final RoundingPolicy roundingPolicy; | |
76 | private final AllocationManager.Factory allocationManagerFactory; | |
77 | ||
78 | private volatile boolean isClosed = false; // the allocator has been closed | |
79 | ||
80 | /** | |
81 | * Initialize an allocator. | |
82 | * | |
83 | * @param parentAllocator parent allocator. null if defining a root allocator | |
84 | * @param name name of this allocator | |
85 | * @param config configuration including other options of this allocator | |
86 | * | |
87 | * @see Config | |
88 | */ | |
89 | protected BaseAllocator( | |
90 | final BaseAllocator parentAllocator, | |
91 | final String name, | |
92 | final Config config) throws OutOfMemoryException { | |
93 | super(parentAllocator, name, config.getInitReservation(), config.getMaxAllocation()); | |
94 | ||
95 | this.listener = config.getListener(); | |
96 | this.allocationManagerFactory = config.getAllocationManagerFactory(); | |
97 | ||
98 | if (parentAllocator != null) { | |
99 | this.root = parentAllocator.root; | |
100 | empty = parentAllocator.empty; | |
101 | } else if (this instanceof RootAllocator) { | |
102 | this.root = (RootAllocator) this; | |
103 | empty = createEmpty(); | |
104 | } else { | |
105 | throw new IllegalStateException("An parent allocator must either carry a root or be the " + | |
106 | "root."); | |
107 | } | |
108 | ||
109 | this.parentAllocator = parentAllocator; | |
110 | this.name = name; | |
111 | ||
112 | this.childAllocators = Collections.synchronizedMap(new IdentityHashMap<>()); | |
113 | ||
114 | if (DEBUG) { | |
115 | reservations = new IdentityHashMap<>(); | |
116 | childLedgers = new IdentityHashMap<>(); | |
117 | historicalLog = new HistoricalLog(DEBUG_LOG_LENGTH, "allocator[%s]", name); | |
118 | hist("created by \"%s\", owned = %d", name, this.getAllocatedMemory()); | |
119 | } else { | |
120 | reservations = null; | |
121 | historicalLog = null; | |
122 | childLedgers = null; | |
123 | } | |
124 | this.roundingPolicy = config.getRoundingPolicy(); | |
125 | } | |
126 | ||
127 | @Override | |
128 | public AllocationListener getListener() { | |
129 | return listener; | |
130 | } | |
131 | ||
132 | @Override | |
133 | public BaseAllocator getParentAllocator() { | |
134 | return parentAllocator; | |
135 | } | |
136 | ||
137 | @Override | |
138 | public Collection<BufferAllocator> getChildAllocators() { | |
139 | synchronized (childAllocators) { | |
140 | return new HashSet<>(childAllocators.keySet()); | |
141 | } | |
142 | } | |
143 | ||
144 | private static String createErrorMsg(final BufferAllocator allocator, final long rounded, final long requested) { | |
145 | if (rounded != requested) { | |
146 | return String.format( | |
147 | "Unable to allocate buffer of size %d (rounded from %d) due to memory limit. Current " + | |
148 | "allocation: %d", rounded, requested, allocator.getAllocatedMemory()); | |
149 | } else { | |
150 | return String.format( | |
151 | "Unable to allocate buffer of size %d due to memory limit. Current " + | |
152 | "allocation: %d", rounded, allocator.getAllocatedMemory()); | |
153 | } | |
154 | } | |
155 | ||
156 | public static boolean isDebug() { | |
157 | return DEBUG; | |
158 | } | |
159 | ||
160 | @Override | |
161 | public void assertOpen() { | |
162 | if (AssertionUtil.ASSERT_ENABLED) { | |
163 | if (isClosed) { | |
164 | throw new IllegalStateException("Attempting operation on allocator when allocator is closed.\n" + | |
165 | toVerboseString()); | |
166 | } | |
167 | } | |
168 | } | |
169 | ||
170 | @Override | |
171 | public String getName() { | |
172 | return name; | |
173 | } | |
174 | ||
175 | @Override | |
176 | public ArrowBuf getEmpty() { | |
177 | return empty; | |
178 | } | |
179 | ||
180 | /** | |
181 | * For debug/verification purposes only. Allows an AllocationManager to tell the allocator that | |
182 | * we have a new ledger | |
183 | * associated with this allocator. | |
184 | */ | |
185 | void associateLedger(BufferLedger ledger) { | |
186 | assertOpen(); | |
187 | if (DEBUG) { | |
188 | synchronized (DEBUG_LOCK) { | |
189 | childLedgers.put(ledger, null); | |
190 | } | |
191 | } | |
192 | } | |
193 | ||
194 | /** | |
195 | * For debug/verification purposes only. Allows an AllocationManager to tell the allocator that | |
196 | * we are removing a | |
197 | * ledger associated with this allocator | |
198 | */ | |
199 | void dissociateLedger(BufferLedger ledger) { | |
200 | assertOpen(); | |
201 | if (DEBUG) { | |
202 | synchronized (DEBUG_LOCK) { | |
203 | if (!childLedgers.containsKey(ledger)) { | |
204 | throw new IllegalStateException("Trying to remove a child ledger that doesn't exist."); | |
205 | } | |
206 | childLedgers.remove(ledger); | |
207 | } | |
208 | } | |
209 | } | |
210 | ||
211 | /** | |
212 | * Track when a ChildAllocator of this BaseAllocator is closed. Used for debugging purposes. | |
213 | * | |
214 | * @param childAllocator The child allocator that has been closed. | |
215 | */ | |
216 | private void childClosed(final BaseAllocator childAllocator) { | |
217 | assertOpen(); | |
218 | ||
219 | if (DEBUG) { | |
220 | Preconditions.checkArgument(childAllocator != null, "child allocator can't be null"); | |
221 | ||
222 | synchronized (DEBUG_LOCK) { | |
223 | final Object object = childAllocators.remove(childAllocator); | |
224 | if (object == null) { | |
225 | childAllocator.historicalLog.logHistory(logger); | |
226 | throw new IllegalStateException("Child allocator[" + childAllocator.name + | |
227 | "] not found in parent allocator[" + name + "]'s childAllocators"); | |
228 | } | |
229 | } | |
230 | } else { | |
231 | childAllocators.remove(childAllocator); | |
232 | } | |
233 | listener.onChildRemoved(this, childAllocator); | |
234 | } | |
235 | ||
236 | @Override | |
237 | public ArrowBuf buffer(final long initialRequestSize) { | |
238 | assertOpen(); | |
239 | ||
240 | return buffer(initialRequestSize, null); | |
241 | } | |
242 | ||
243 | private ArrowBuf createEmpty() { | |
244 | return allocationManagerFactory.empty(); | |
245 | } | |
246 | ||
247 | @Override | |
248 | public ArrowBuf buffer(final long initialRequestSize, BufferManager manager) { | |
249 | assertOpen(); | |
250 | ||
251 | Preconditions.checkArgument(initialRequestSize >= 0, "the requested size must be non-negative"); | |
252 | ||
253 | if (initialRequestSize == 0) { | |
254 | return getEmpty(); | |
255 | } | |
256 | ||
257 | // round the request size according to the rounding policy | |
258 | final long actualRequestSize = roundingPolicy.getRoundedSize(initialRequestSize); | |
259 | ||
260 | listener.onPreAllocation(actualRequestSize); | |
261 | ||
262 | AllocationOutcome outcome = this.allocateBytes(actualRequestSize); | |
263 | if (!outcome.isOk()) { | |
264 | if (listener.onFailedAllocation(actualRequestSize, outcome)) { | |
265 | // Second try, in case the listener can do something about it | |
266 | outcome = this.allocateBytes(actualRequestSize); | |
267 | } | |
268 | if (!outcome.isOk()) { | |
269 | throw new OutOfMemoryException(createErrorMsg(this, actualRequestSize, | |
270 | initialRequestSize), outcome.getDetails()); | |
271 | } | |
272 | } | |
273 | ||
274 | boolean success = false; | |
275 | try { | |
276 | ArrowBuf buffer = bufferWithoutReservation(actualRequestSize, manager); | |
277 | success = true; | |
278 | listener.onAllocation(actualRequestSize); | |
279 | return buffer; | |
280 | } catch (OutOfMemoryError e) { | |
281 | throw e; | |
282 | } finally { | |
283 | if (!success) { | |
284 | releaseBytes(actualRequestSize); | |
285 | } | |
286 | } | |
287 | } | |
288 | ||
289 | /** | |
290 | * Used by usual allocation as well as for allocating a pre-reserved buffer. | |
291 | * Skips the typical accounting associated with creating a new buffer. | |
292 | */ | |
293 | private ArrowBuf bufferWithoutReservation( | |
294 | final long size, | |
295 | BufferManager bufferManager) throws OutOfMemoryException { | |
296 | assertOpen(); | |
297 | ||
298 | final AllocationManager manager = newAllocationManager(size); | |
299 | final BufferLedger ledger = manager.associate(this); // +1 ref cnt (required) | |
300 | final ArrowBuf buffer = ledger.newArrowBuf(size, bufferManager); | |
301 | ||
302 | // make sure that our allocation is equal to what we expected. | |
303 | Preconditions.checkArgument(buffer.capacity() == size, | |
304 | "Allocated capacity %d was not equal to requested capacity %d.", buffer.capacity(), size); | |
305 | ||
306 | return buffer; | |
307 | } | |
308 | ||
309 | private AllocationManager newAllocationManager(long size) { | |
310 | return newAllocationManager(this, size); | |
311 | } | |
312 | ||
313 | ||
314 | private AllocationManager newAllocationManager(BaseAllocator accountingAllocator, long size) { | |
315 | return allocationManagerFactory.create(accountingAllocator, size); | |
316 | } | |
317 | ||
318 | @Override | |
319 | public BufferAllocator getRoot() { | |
320 | return root; | |
321 | } | |
322 | ||
323 | @Override | |
324 | public BufferAllocator newChildAllocator( | |
325 | final String name, | |
326 | final long initReservation, | |
327 | final long maxAllocation) { | |
328 | return newChildAllocator(name, this.listener, initReservation, maxAllocation); | |
329 | } | |
330 | ||
331 | @Override | |
332 | public BufferAllocator newChildAllocator( | |
333 | final String name, | |
334 | final AllocationListener listener, | |
335 | final long initReservation, | |
336 | final long maxAllocation) { | |
337 | assertOpen(); | |
338 | ||
339 | final ChildAllocator childAllocator = | |
340 | new ChildAllocator(this, name, configBuilder() | |
341 | .listener(listener) | |
342 | .initReservation(initReservation) | |
343 | .maxAllocation(maxAllocation) | |
344 | .roundingPolicy(roundingPolicy) | |
345 | .allocationManagerFactory(allocationManagerFactory) | |
346 | .build()); | |
347 | ||
348 | if (DEBUG) { | |
349 | synchronized (DEBUG_LOCK) { | |
350 | childAllocators.put(childAllocator, childAllocator); | |
351 | historicalLog.recordEvent("allocator[%s] created new child allocator[%s]", name, | |
352 | childAllocator.getName()); | |
353 | } | |
354 | } else { | |
355 | childAllocators.put(childAllocator, childAllocator); | |
356 | } | |
357 | this.listener.onChildAdded(this, childAllocator); | |
358 | ||
359 | return childAllocator; | |
360 | } | |
361 | ||
362 | @Override | |
363 | public AllocationReservation newReservation() { | |
364 | assertOpen(); | |
365 | ||
366 | return new Reservation(); | |
367 | } | |
368 | ||
369 | @Override | |
370 | public synchronized void close() { | |
371 | /* | |
372 | * Some owners may close more than once because of complex cleanup and shutdown | |
373 | * procedures. | |
374 | */ | |
375 | if (isClosed) { | |
376 | return; | |
377 | } | |
378 | ||
379 | isClosed = true; | |
380 | ||
381 | StringBuilder outstandingChildAllocators = new StringBuilder(); | |
382 | if (DEBUG) { | |
383 | synchronized (DEBUG_LOCK) { | |
384 | verifyAllocator(); | |
385 | ||
386 | // are there outstanding child allocators? | |
387 | if (!childAllocators.isEmpty()) { | |
388 | for (final BaseAllocator childAllocator : childAllocators.keySet()) { | |
389 | if (childAllocator.isClosed) { | |
390 | logger.warn(String.format( | |
391 | "Closed child allocator[%s] on parent allocator[%s]'s child list.\n%s", | |
392 | childAllocator.name, name, toString())); | |
393 | } | |
394 | } | |
395 | ||
396 | throw new IllegalStateException( | |
397 | String.format("Allocator[%s] closed with outstanding child allocators.\n%s", name, | |
398 | toString())); | |
399 | } | |
400 | ||
401 | // are there outstanding buffers? | |
402 | final int allocatedCount = childLedgers.size(); | |
403 | if (allocatedCount > 0) { | |
404 | throw new IllegalStateException( | |
405 | String.format("Allocator[%s] closed with outstanding buffers allocated (%d).\n%s", | |
406 | name, allocatedCount, toString())); | |
407 | } | |
408 | ||
409 | if (reservations.size() != 0) { | |
410 | throw new IllegalStateException( | |
411 | String.format("Allocator[%s] closed with outstanding reservations (%d).\n%s", name, | |
412 | reservations.size(), | |
413 | toString())); | |
414 | } | |
415 | ||
416 | } | |
417 | } else { | |
418 | if (!childAllocators.isEmpty()) { | |
419 | outstandingChildAllocators.append("Outstanding child allocators : \n"); | |
420 | synchronized (childAllocators) { | |
421 | for (final BaseAllocator childAllocator : childAllocators.keySet()) { | |
422 | outstandingChildAllocators.append(String.format(" %s", childAllocator.toString())); | |
423 | } | |
424 | } | |
425 | } | |
426 | } | |
427 | ||
428 | // Is there unaccounted-for outstanding allocation? | |
429 | final long allocated = getAllocatedMemory(); | |
430 | if (allocated > 0) { | |
431 | if (parent != null && reservation > allocated) { | |
432 | parent.releaseBytes(reservation - allocated); | |
433 | } | |
434 | String msg = String.format("Memory was leaked by query. Memory leaked: (%d)\n%s%s", allocated, | |
435 | outstandingChildAllocators.toString(), toString()); | |
436 | logger.error(msg); | |
437 | throw new IllegalStateException(msg); | |
438 | } | |
439 | ||
440 | // we need to release our memory to our parent before we tell it we've closed. | |
441 | super.close(); | |
442 | ||
443 | // Inform our parent allocator that we've closed | |
444 | if (parentAllocator != null) { | |
445 | parentAllocator.childClosed(this); | |
446 | } | |
447 | ||
448 | if (DEBUG) { | |
449 | historicalLog.recordEvent("closed"); | |
450 | logger.debug(String.format("closed allocator[%s].", name)); | |
451 | } | |
452 | ||
453 | ||
454 | } | |
455 | ||
456 | @Override | |
457 | public String toString() { | |
458 | final Verbosity verbosity = logger.isTraceEnabled() ? Verbosity.LOG_WITH_STACKTRACE | |
459 | : Verbosity.BASIC; | |
460 | final StringBuilder sb = new StringBuilder(); | |
461 | print(sb, 0, verbosity); | |
462 | return sb.toString(); | |
463 | } | |
464 | ||
465 | /** | |
466 | * Provide a verbose string of the current allocator state. Includes the state of all child | |
467 | * allocators, along with | |
468 | * historical logs of each object and including stacktraces. | |
469 | * | |
470 | * @return A Verbose string of current allocator state. | |
471 | */ | |
472 | @Override | |
473 | public String toVerboseString() { | |
474 | final StringBuilder sb = new StringBuilder(); | |
475 | print(sb, 0, Verbosity.LOG_WITH_STACKTRACE); | |
476 | return sb.toString(); | |
477 | } | |
478 | ||
479 | private void hist(String noteFormat, Object... args) { | |
480 | historicalLog.recordEvent(noteFormat, args); | |
481 | } | |
482 | ||
483 | /** | |
484 | * Verifies the accounting state of the allocator. Only works for DEBUG. | |
485 | * | |
486 | * @throws IllegalStateException when any problems are found | |
487 | */ | |
488 | void verifyAllocator() { | |
489 | final IdentityHashMap<AllocationManager, BaseAllocator> seen = new IdentityHashMap<>(); | |
490 | verifyAllocator(seen); | |
491 | } | |
492 | ||
493 | /** | |
494 | * Verifies the accounting state of the allocator (Only works for DEBUG) | |
495 | * This overload is used for recursive calls, allowing for checking | |
496 | * that ArrowBufs are unique across all allocators that are checked. | |
497 | * | |
498 | * @param buffersSeen a map of buffers that have already been seen when walking a tree of | |
499 | * allocators | |
500 | * @throws IllegalStateException when any problems are found | |
501 | */ | |
502 | private void verifyAllocator( | |
503 | final IdentityHashMap<AllocationManager, BaseAllocator> buffersSeen) { | |
504 | // The remaining tests can only be performed if we're in debug mode. | |
505 | if (!DEBUG) { | |
506 | return; | |
507 | } | |
508 | ||
509 | synchronized (DEBUG_LOCK) { | |
510 | final long allocated = getAllocatedMemory(); | |
511 | ||
512 | // verify my direct descendants | |
513 | final Set<BaseAllocator> childSet = childAllocators.keySet(); | |
514 | for (final BaseAllocator childAllocator : childSet) { | |
515 | childAllocator.verifyAllocator(buffersSeen); | |
516 | } | |
517 | ||
518 | /* | |
519 | * Verify my relationships with my descendants. | |
520 | * | |
521 | * The sum of direct child allocators' owned memory must be <= my allocated memory; my | |
522 | * allocated memory also | |
523 | * includes ArrowBuf's directly allocated by me. | |
524 | */ | |
525 | long childTotal = 0; | |
526 | for (final BaseAllocator childAllocator : childSet) { | |
527 | childTotal += Math.max(childAllocator.getAllocatedMemory(), childAllocator.reservation); | |
528 | } | |
529 | if (childTotal > getAllocatedMemory()) { | |
530 | historicalLog.logHistory(logger); | |
531 | logger.debug("allocator[" + name + "] child event logs BEGIN"); | |
532 | for (final BaseAllocator childAllocator : childSet) { | |
533 | childAllocator.historicalLog.logHistory(logger); | |
534 | } | |
535 | logger.debug("allocator[" + name + "] child event logs END"); | |
536 | throw new IllegalStateException( | |
537 | "Child allocators own more memory (" + childTotal + ") than their parent (name = " + | |
538 | name + " ) has allocated (" + getAllocatedMemory() + ')'); | |
539 | } | |
540 | ||
541 | // Furthermore, the amount I've allocated should be that plus buffers I've allocated. | |
542 | long bufferTotal = 0; | |
543 | ||
544 | final Set<BufferLedger> ledgerSet = childLedgers.keySet(); | |
545 | for (final BufferLedger ledger : ledgerSet) { | |
546 | if (!ledger.isOwningLedger()) { | |
547 | continue; | |
548 | } | |
549 | ||
550 | final AllocationManager am = ledger.getAllocationManager(); | |
551 | /* | |
552 | * Even when shared, ArrowBufs are rewrapped, so we should never see the same instance | |
553 | * twice. | |
554 | */ | |
555 | final BaseAllocator otherOwner = buffersSeen.get(am); | |
556 | if (otherOwner != null) { | |
557 | throw new IllegalStateException("This allocator's ArrowBuf already owned by another " + | |
558 | "allocator"); | |
559 | } | |
560 | buffersSeen.put(am, this); | |
561 | ||
562 | bufferTotal += am.getSize(); | |
563 | } | |
564 | ||
565 | // Preallocated space has to be accounted for | |
566 | final Set<Reservation> reservationSet = reservations.keySet(); | |
567 | long reservedTotal = 0; | |
568 | for (final Reservation reservation : reservationSet) { | |
569 | if (!reservation.isUsed()) { | |
570 | reservedTotal += reservation.getSize(); | |
571 | } | |
572 | } | |
573 | ||
574 | if (bufferTotal + reservedTotal + childTotal != getAllocatedMemory()) { | |
575 | final StringBuilder sb = new StringBuilder(); | |
576 | sb.append("allocator["); | |
577 | sb.append(name); | |
578 | sb.append("]\nallocated: "); | |
579 | sb.append(Long.toString(allocated)); | |
580 | sb.append(" allocated - (bufferTotal + reservedTotal + childTotal): "); | |
581 | sb.append(Long.toString(allocated - (bufferTotal + reservedTotal + childTotal))); | |
582 | sb.append('\n'); | |
583 | ||
584 | if (bufferTotal != 0) { | |
585 | sb.append("buffer total: "); | |
586 | sb.append(Long.toString(bufferTotal)); | |
587 | sb.append('\n'); | |
588 | dumpBuffers(sb, ledgerSet); | |
589 | } | |
590 | ||
591 | if (childTotal != 0) { | |
592 | sb.append("child total: "); | |
593 | sb.append(Long.toString(childTotal)); | |
594 | sb.append('\n'); | |
595 | ||
596 | for (final BaseAllocator childAllocator : childSet) { | |
597 | sb.append("child allocator["); | |
598 | sb.append(childAllocator.name); | |
599 | sb.append("] owned "); | |
600 | sb.append(Long.toString(childAllocator.getAllocatedMemory())); | |
601 | sb.append('\n'); | |
602 | } | |
603 | } | |
604 | ||
605 | if (reservedTotal != 0) { | |
606 | sb.append(String.format("reserved total : %d bytes.", reservedTotal)); | |
607 | for (final Reservation reservation : reservationSet) { | |
608 | reservation.historicalLog.buildHistory(sb, 0, true); | |
609 | sb.append('\n'); | |
610 | } | |
611 | } | |
612 | ||
613 | logger.debug(sb.toString()); | |
614 | ||
615 | final long allocated2 = getAllocatedMemory(); | |
616 | ||
617 | if (allocated2 != allocated) { | |
618 | throw new IllegalStateException(String.format( | |
619 | "allocator[%s]: allocated t1 (%d) + allocated t2 (%d). Someone released memory while in verification.", | |
620 | name, allocated, allocated2)); | |
621 | ||
622 | } | |
623 | throw new IllegalStateException(String.format( | |
624 | "allocator[%s]: buffer space (%d) + prealloc space (%d) + child space (%d) != allocated (%d)", | |
625 | name, bufferTotal, reservedTotal, childTotal, allocated)); | |
626 | } | |
627 | } | |
628 | } | |
629 | ||
630 | void print(StringBuilder sb, int level, Verbosity verbosity) { | |
631 | ||
632 | CommonUtil.indent(sb, level) | |
633 | .append("Allocator(") | |
634 | .append(name) | |
635 | .append(") ") | |
636 | .append(reservation) | |
637 | .append('/') | |
638 | .append(getAllocatedMemory()) | |
639 | .append('/') | |
640 | .append(getPeakMemoryAllocation()) | |
641 | .append('/') | |
642 | .append(getLimit()) | |
643 | .append(" (res/actual/peak/limit)") | |
644 | .append('\n'); | |
645 | ||
646 | if (DEBUG) { | |
647 | CommonUtil.indent(sb, level + 1).append(String.format("child allocators: %d\n", childAllocators.size())); | |
648 | for (BaseAllocator child : childAllocators.keySet()) { | |
649 | child.print(sb, level + 2, verbosity); | |
650 | } | |
651 | ||
652 | CommonUtil.indent(sb, level + 1).append(String.format("ledgers: %d\n", childLedgers.size())); | |
653 | for (BufferLedger ledger : childLedgers.keySet()) { | |
654 | ledger.print(sb, level + 2, verbosity); | |
655 | } | |
656 | ||
657 | final Set<Reservation> reservations = this.reservations.keySet(); | |
658 | CommonUtil.indent(sb, level + 1).append(String.format("reservations: %d\n", reservations.size())); | |
659 | for (final Reservation reservation : reservations) { | |
660 | if (verbosity.includeHistoricalLog) { | |
661 | reservation.historicalLog.buildHistory(sb, level + 3, true); | |
662 | } | |
663 | } | |
664 | ||
665 | } | |
666 | ||
667 | } | |
668 | ||
669 | private void dumpBuffers(final StringBuilder sb, final Set<BufferLedger> ledgerSet) { | |
670 | for (final BufferLedger ledger : ledgerSet) { | |
671 | if (!ledger.isOwningLedger()) { | |
672 | continue; | |
673 | } | |
674 | final AllocationManager am = ledger.getAllocationManager(); | |
675 | sb.append("UnsafeDirectLittleEndian[identityHashCode == "); | |
676 | sb.append(Integer.toString(System.identityHashCode(am))); | |
677 | sb.append("] size "); | |
678 | sb.append(Long.toString(am.getSize())); | |
679 | sb.append('\n'); | |
680 | } | |
681 | } | |
682 | ||
683 | /** | |
684 | * Enum for logging verbosity. | |
685 | */ | |
686 | public enum Verbosity { | |
687 | BASIC(false, false), // only include basic information | |
688 | LOG(true, false), // include basic | |
689 | LOG_WITH_STACKTRACE(true, true) // | |
690 | ; | |
691 | ||
692 | public final boolean includeHistoricalLog; | |
693 | public final boolean includeStackTraces; | |
694 | ||
695 | Verbosity(boolean includeHistoricalLog, boolean includeStackTraces) { | |
696 | this.includeHistoricalLog = includeHistoricalLog; | |
697 | this.includeStackTraces = includeStackTraces; | |
698 | } | |
699 | } | |
700 | ||
701 | /** | |
702 | * Returns a default {@link Config} instance. | |
703 | * | |
704 | * @see ImmutableConfig.Builder | |
705 | */ | |
706 | public static Config defaultConfig() { | |
707 | return DEFAULT_CONFIG; | |
708 | ||
709 | } | |
710 | ||
711 | /** | |
712 | * Returns a builder class for configuring BaseAllocator's options. | |
713 | */ | |
714 | public static ImmutableConfig.Builder configBuilder() { | |
715 | return ImmutableConfig.builder(); | |
716 | } | |
717 | ||
718 | @Override | |
719 | public RoundingPolicy getRoundingPolicy() { | |
720 | return roundingPolicy; | |
721 | } | |
722 | ||
723 | /** | |
724 | * Config class of {@link BaseAllocator}. | |
725 | */ | |
726 | @Value.Immutable | |
727 | abstract static class Config { | |
728 | /** | |
729 | * Factory for creating {@link AllocationManager} instances. | |
730 | */ | |
731 | @Value.Default | |
732 | AllocationManager.Factory getAllocationManagerFactory() { | |
733 | return DefaultAllocationManagerOption.getDefaultAllocationManagerFactory(); | |
734 | } | |
735 | ||
736 | /** | |
737 | * Listener callback. Must be non-null. | |
738 | */ | |
739 | @Value.Default | |
740 | AllocationListener getListener() { | |
741 | return AllocationListener.NOOP; | |
742 | } | |
743 | ||
744 | /** | |
745 | * Initial reservation size (in bytes) for this allocator. | |
746 | */ | |
747 | @Value.Default | |
748 | long getInitReservation() { | |
749 | return 0; | |
750 | } | |
751 | ||
752 | /** | |
753 | * Max allocation size (in bytes) for this allocator, allocations past this limit fail. | |
754 | * Can be modified after construction. | |
755 | */ | |
756 | @Value.Default | |
757 | long getMaxAllocation() { | |
758 | return Long.MAX_VALUE; | |
759 | } | |
760 | ||
761 | /** | |
762 | * The policy for rounding the buffer size. | |
763 | */ | |
764 | @Value.Default | |
765 | RoundingPolicy getRoundingPolicy() { | |
766 | return DefaultRoundingPolicy.DEFAULT_ROUNDING_POLICY; | |
767 | } | |
768 | } | |
769 | ||
770 | /** | |
771 | * Implementation of {@link AllocationReservation} that supports | |
772 | * history tracking under {@linkplain #DEBUG} is true. | |
773 | */ | |
774 | public class Reservation implements AllocationReservation { | |
775 | ||
776 | private final HistoricalLog historicalLog; | |
777 | private int nBytes = 0; | |
778 | private boolean used = false; | |
779 | private boolean closed = false; | |
780 | ||
781 | /** | |
782 | * Creates a new reservation. | |
783 | * | |
784 | * <p>If {@linkplain #DEBUG} is true this will capture a historical | |
785 | * log of events relevant to this Reservation. | |
786 | */ | |
787 | public Reservation() { | |
788 | if (DEBUG) { | |
789 | historicalLog = new HistoricalLog("Reservation[allocator[%s], %d]", name, System | |
790 | .identityHashCode(this)); | |
791 | historicalLog.recordEvent("created"); | |
792 | synchronized (DEBUG_LOCK) { | |
793 | reservations.put(this, this); | |
794 | } | |
795 | } else { | |
796 | historicalLog = null; | |
797 | } | |
798 | } | |
799 | ||
800 | @Override | |
801 | public boolean add(final int nBytes) { | |
802 | assertOpen(); | |
803 | ||
804 | Preconditions.checkArgument(nBytes >= 0, "nBytes(%d) < 0", nBytes); | |
805 | Preconditions.checkState(!closed, "Attempt to increase reservation after reservation has been closed"); | |
806 | Preconditions.checkState(!used, "Attempt to increase reservation after reservation has been used"); | |
807 | ||
808 | // we round up to next power of two since all reservations are done in powers of two. This | |
809 | // may overestimate the | |
810 | // preallocation since someone may perceive additions to be power of two. If this becomes a | |
811 | // problem, we can look | |
812 | // at | |
813 | // modifying this behavior so that we maintain what we reserve and what the user asked for | |
814 | // and make sure to only | |
815 | // round to power of two as necessary. | |
816 | final int nBytesTwo = CommonUtil.nextPowerOfTwo(nBytes); | |
817 | if (!reserve(nBytesTwo)) { | |
818 | return false; | |
819 | } | |
820 | ||
821 | this.nBytes += nBytesTwo; | |
822 | return true; | |
823 | } | |
824 | ||
825 | @Override | |
826 | public ArrowBuf allocateBuffer() { | |
827 | assertOpen(); | |
828 | ||
829 | Preconditions.checkState(!closed, "Attempt to allocate after closed"); | |
830 | Preconditions.checkState(!used, "Attempt to allocate more than once"); | |
831 | ||
832 | final ArrowBuf arrowBuf = allocate(nBytes); | |
833 | used = true; | |
834 | return arrowBuf; | |
835 | } | |
836 | ||
837 | @Override | |
838 | public int getSize() { | |
839 | return nBytes; | |
840 | } | |
841 | ||
842 | @Override | |
843 | public boolean isUsed() { | |
844 | return used; | |
845 | } | |
846 | ||
847 | @Override | |
848 | public boolean isClosed() { | |
849 | return closed; | |
850 | } | |
851 | ||
852 | @Override | |
853 | public void close() { | |
854 | assertOpen(); | |
855 | ||
856 | if (closed) { | |
857 | return; | |
858 | } | |
859 | ||
860 | if (DEBUG) { | |
861 | if (!isClosed()) { | |
862 | final Object object; | |
863 | synchronized (DEBUG_LOCK) { | |
864 | object = reservations.remove(this); | |
865 | } | |
866 | if (object == null) { | |
867 | final StringBuilder sb = new StringBuilder(); | |
868 | print(sb, 0, Verbosity.LOG_WITH_STACKTRACE); | |
869 | logger.debug(sb.toString()); | |
870 | throw new IllegalStateException(String.format("Didn't find closing reservation[%d]", | |
871 | System.identityHashCode(this))); | |
872 | } | |
873 | ||
874 | historicalLog.recordEvent("closed"); | |
875 | } | |
876 | } | |
877 | ||
878 | if (!used) { | |
879 | releaseReservation(nBytes); | |
880 | } | |
881 | ||
882 | closed = true; | |
883 | } | |
884 | ||
885 | @Override | |
886 | public boolean reserve(int nBytes) { | |
887 | assertOpen(); | |
888 | ||
889 | final AllocationOutcome outcome = BaseAllocator.this.allocateBytes(nBytes); | |
890 | ||
891 | if (DEBUG) { | |
892 | historicalLog.recordEvent("reserve(%d) => %s", nBytes, Boolean.toString(outcome.isOk())); | |
893 | } | |
894 | ||
895 | return outcome.isOk(); | |
896 | } | |
897 | ||
898 | /** | |
899 | * Allocate a buffer of the requested size. | |
900 | * | |
901 | * <p>The implementation of the allocator's inner class provides this. | |
902 | * | |
903 | * @param nBytes the size of the buffer requested | |
904 | * @return the buffer, or null, if the request cannot be satisfied | |
905 | */ | |
906 | private ArrowBuf allocate(int nBytes) { | |
907 | assertOpen(); | |
908 | ||
909 | boolean success = false; | |
910 | ||
911 | /* | |
912 | * The reservation already added the requested bytes to the allocators owned and allocated | |
913 | * bytes via reserve(). | |
914 | * This ensures that they can't go away. But when we ask for the buffer here, that will add | |
915 | * to the allocated bytes | |
916 | * as well, so we need to return the same number back to avoid double-counting them. | |
917 | */ | |
918 | try { | |
919 | final ArrowBuf arrowBuf = BaseAllocator.this.bufferWithoutReservation(nBytes, null); | |
920 | ||
921 | listener.onAllocation(nBytes); | |
922 | if (DEBUG) { | |
923 | historicalLog.recordEvent("allocate() => %s", String.format("ArrowBuf[%d]", arrowBuf | |
924 | .getId())); | |
925 | } | |
926 | success = true; | |
927 | return arrowBuf; | |
928 | } finally { | |
929 | if (!success) { | |
930 | releaseBytes(nBytes); | |
931 | } | |
932 | } | |
933 | } | |
934 | ||
935 | /** | |
936 | * Return the reservation back to the allocator without having used it. | |
937 | * | |
938 | * @param nBytes the size of the reservation | |
939 | */ | |
940 | private void releaseReservation(int nBytes) { | |
941 | assertOpen(); | |
942 | ||
943 | releaseBytes(nBytes); | |
944 | ||
945 | if (DEBUG) { | |
946 | historicalLog.recordEvent("releaseReservation(%d)", nBytes); | |
947 | } | |
948 | } | |
949 | ||
950 | } | |
951 | } |