]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one or more | |
3 | * contributor license agreements. See the NOTICE file distributed with | |
4 | * this work for additional information regarding copyright ownership. | |
5 | * The ASF licenses this file to You under the Apache License, Version 2.0 | |
6 | * (the "License"); you may not use this file except in compliance with | |
7 | * the License. You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | */ | |
17 | ||
18 | package org.apache.arrow.memory; | |
19 | ||
20 | import java.util.concurrent.atomic.AtomicLong; | |
21 | ||
22 | import javax.annotation.concurrent.ThreadSafe; | |
23 | ||
24 | import org.apache.arrow.util.Preconditions; | |
25 | ||
26 | /** | |
27 | * Provides a concurrent way to manage account for memory usage without locking. Used as basis | |
28 | * for Allocators. All | |
29 | * operations are threadsafe (except for close). | |
30 | */ | |
31 | @ThreadSafe | |
32 | class Accountant implements AutoCloseable { | |
33 | ||
34 | /** | |
35 | * The parent allocator. | |
36 | */ | |
37 | protected final Accountant parent; | |
38 | ||
39 | private final String name; | |
40 | ||
41 | /** | |
42 | * The amount of memory reserved for this allocator. Releases below this amount of memory will | |
43 | * not be returned to the | |
44 | * parent Accountant until this Accountant is closed. | |
45 | */ | |
46 | protected final long reservation; | |
47 | ||
48 | private final AtomicLong peakAllocation = new AtomicLong(); | |
49 | ||
50 | /** | |
51 | * Maximum local memory that can be held. This can be externally updated. Changing it won't | |
52 | * cause past memory to | |
53 | * change but will change responses to future allocation efforts | |
54 | */ | |
55 | private final AtomicLong allocationLimit = new AtomicLong(); | |
56 | ||
57 | /** | |
58 | * Currently allocated amount of memory. | |
59 | */ | |
60 | private final AtomicLong locallyHeldMemory = new AtomicLong(); | |
61 | ||
62 | public Accountant(Accountant parent, String name, long reservation, long maxAllocation) { | |
63 | Preconditions.checkNotNull(name, "name must not be null"); | |
64 | Preconditions.checkArgument(reservation >= 0, "The initial reservation size must be non-negative."); | |
65 | Preconditions.checkArgument(maxAllocation >= 0, "The maximum allocation limit must be non-negative."); | |
66 | Preconditions.checkArgument(reservation <= maxAllocation, | |
67 | "The initial reservation size must be <= the maximum allocation."); | |
68 | Preconditions.checkArgument(reservation == 0 || parent != null, "The root accountant can't reserve memory."); | |
69 | ||
70 | this.parent = parent; | |
71 | this.name = name; | |
72 | this.reservation = reservation; | |
73 | this.allocationLimit.set(maxAllocation); | |
74 | ||
75 | if (reservation != 0) { | |
76 | // we will allocate a reservation from our parent. | |
77 | final AllocationOutcome outcome = parent.allocateBytes(reservation); | |
78 | if (!outcome.isOk()) { | |
79 | throw new OutOfMemoryException(String.format( | |
80 | "Failure trying to allocate initial reservation for Allocator. " + | |
81 | "Attempted to allocate %d bytes.", reservation), outcome.getDetails()); | |
82 | } | |
83 | } | |
84 | } | |
85 | ||
86 | /** | |
87 | * Attempt to allocate the requested amount of memory. Either completely succeeds or completely | |
88 | * fails. If it fails, no changes are made to accounting. | |
89 | * | |
90 | * @param size The amount of memory to reserve in bytes. | |
91 | * @return the status and details of allocation at each allocator in the chain. | |
92 | */ | |
93 | AllocationOutcome allocateBytes(long size) { | |
94 | AllocationOutcome.Status status = allocateBytesInternal(size); | |
95 | if (status.isOk()) { | |
96 | return AllocationOutcome.SUCCESS_INSTANCE; | |
97 | } else { | |
98 | // Try again, but with details this time. | |
99 | // Populating details only on failures avoids performance overhead in the common case (success case). | |
100 | AllocationOutcomeDetails details = new AllocationOutcomeDetails(); | |
101 | status = allocateBytesInternal(size, details); | |
102 | return new AllocationOutcome(status, details); | |
103 | } | |
104 | } | |
105 | ||
106 | private AllocationOutcome.Status allocateBytesInternal(long size, AllocationOutcomeDetails details) { | |
107 | final AllocationOutcome.Status status = allocate(size, | |
108 | true /*incomingUpdatePeek*/, false /*forceAllocation*/, details); | |
109 | if (!status.isOk()) { | |
110 | releaseBytes(size); | |
111 | } | |
112 | return status; | |
113 | } | |
114 | ||
115 | private AllocationOutcome.Status allocateBytesInternal(long size) { | |
116 | return allocateBytesInternal(size, null /*details*/); | |
117 | } | |
118 | ||
119 | private void updatePeak() { | |
120 | final long currentMemory = locallyHeldMemory.get(); | |
121 | while (true) { | |
122 | ||
123 | final long previousPeak = peakAllocation.get(); | |
124 | if (currentMemory > previousPeak) { | |
125 | if (!peakAllocation.compareAndSet(previousPeak, currentMemory)) { | |
126 | // peak allocation changed underneath us. try again. | |
127 | continue; | |
128 | } | |
129 | } | |
130 | ||
131 | // we either succeeded to set peak allocation or we weren't above the previous peak, exit. | |
132 | return; | |
133 | } | |
134 | } | |
135 | ||
136 | ||
137 | /** | |
138 | * Increase the accounting. Returns whether the allocation fit within limits. | |
139 | * | |
140 | * @param size to increase | |
141 | * @return Whether the allocation fit within limits. | |
142 | */ | |
143 | public boolean forceAllocate(long size) { | |
144 | final AllocationOutcome.Status outcome = allocate(size, true, true, null); | |
145 | return outcome.isOk(); | |
146 | } | |
147 | ||
148 | /** | |
149 | * Internal method for allocation. This takes a forced approach to allocation to ensure that we | |
150 | * manage reservation | |
151 | * boundary issues consistently. Allocation is always done through the entire tree. The two | |
152 | * options that we influence | |
153 | * are whether the allocation should be forced and whether or not the peak memory allocation | |
154 | * should be updated. If at | |
155 | * some point during allocation escalation we determine that the allocation is no longer | |
156 | * possible, we will continue to | |
157 | * do a complete and consistent allocation but we will stop updating the peak allocation. We do | |
158 | * this because we know | |
159 | * that we will be directly unwinding this allocation (and thus never actually making the | |
160 | * allocation). If force | |
161 | * allocation is passed, then we continue to update the peak limits since we now know that this | |
162 | * allocation will occur | |
163 | * despite our moving past one or more limits. | |
164 | * | |
165 | * @param size The size of the allocation. | |
166 | * @param incomingUpdatePeak Whether we should update the local peak for this allocation. | |
167 | * @param forceAllocation Whether we should force the allocation. | |
168 | * @return The outcome of the allocation. | |
169 | */ | |
170 | private AllocationOutcome.Status allocate(final long size, final boolean incomingUpdatePeak, | |
171 | final boolean forceAllocation, AllocationOutcomeDetails details) { | |
172 | final long newLocal = locallyHeldMemory.addAndGet(size); | |
173 | final long beyondReservation = newLocal - reservation; | |
174 | final boolean beyondLimit = newLocal > allocationLimit.get(); | |
175 | final boolean updatePeak = forceAllocation || (incomingUpdatePeak && !beyondLimit); | |
176 | ||
177 | if (details != null) { | |
178 | // Add details if required (used in exceptions and debugging). | |
179 | boolean allocationFailed = true; | |
180 | long allocatedLocal = 0; | |
181 | if (!beyondLimit) { | |
182 | allocatedLocal = size - Math.min(beyondReservation, size); | |
183 | allocationFailed = false; | |
184 | } | |
185 | details.pushEntry(this, newLocal - size, size, allocatedLocal, allocationFailed); | |
186 | } | |
187 | ||
188 | AllocationOutcome.Status parentOutcome = AllocationOutcome.Status.SUCCESS; | |
189 | if (beyondReservation > 0 && parent != null) { | |
190 | // we need to get memory from our parent. | |
191 | final long parentRequest = Math.min(beyondReservation, size); | |
192 | parentOutcome = parent.allocate(parentRequest, updatePeak, forceAllocation, details); | |
193 | } | |
194 | ||
195 | final AllocationOutcome.Status finalOutcome; | |
196 | if (beyondLimit) { | |
197 | finalOutcome = AllocationOutcome.Status.FAILED_LOCAL; | |
198 | } else { | |
199 | finalOutcome = parentOutcome.isOk() ? AllocationOutcome.Status.SUCCESS | |
200 | : AllocationOutcome.Status.FAILED_PARENT; | |
201 | } | |
202 | ||
203 | if (updatePeak) { | |
204 | updatePeak(); | |
205 | } | |
206 | ||
207 | return finalOutcome; | |
208 | } | |
209 | ||
210 | public void releaseBytes(long size) { | |
211 | // reduce local memory. all memory released above reservation should be released up the tree. | |
212 | final long newSize = locallyHeldMemory.addAndGet(-size); | |
213 | ||
214 | Preconditions.checkArgument(newSize >= 0, "Accounted size went negative."); | |
215 | ||
216 | final long originalSize = newSize + size; | |
217 | if (originalSize > reservation && parent != null) { | |
218 | // we deallocated memory that we should release to our parent. | |
219 | final long possibleAmountToReleaseToParent = originalSize - reservation; | |
220 | final long actualToReleaseToParent = Math.min(size, possibleAmountToReleaseToParent); | |
221 | parent.releaseBytes(actualToReleaseToParent); | |
222 | } | |
223 | } | |
224 | ||
225 | public boolean isOverLimit() { | |
226 | return getAllocatedMemory() > getLimit() || (parent != null && parent.isOverLimit()); | |
227 | } | |
228 | ||
229 | /** | |
230 | * Close this Accountant. This will release any reservation bytes back to a parent Accountant. | |
231 | */ | |
232 | @Override | |
233 | public void close() { | |
234 | // return memory reservation to parent allocator. | |
235 | if (parent != null) { | |
236 | parent.releaseBytes(reservation); | |
237 | } | |
238 | } | |
239 | ||
240 | /** | |
241 | * Return the name of the accountant. | |
242 | * | |
243 | * @return name of accountant | |
244 | */ | |
245 | public String getName() { | |
246 | return name; | |
247 | } | |
248 | ||
249 | /** | |
250 | * Return the current limit of this Accountant. | |
251 | * | |
252 | * @return Limit in bytes. | |
253 | */ | |
254 | public long getLimit() { | |
255 | return allocationLimit.get(); | |
256 | } | |
257 | ||
258 | /** | |
259 | * Return the initial reservation. | |
260 | * | |
261 | * @return reservation in bytes. | |
262 | */ | |
263 | public long getInitReservation() { | |
264 | return reservation; | |
265 | } | |
266 | ||
267 | /** | |
268 | * Set the maximum amount of memory that can be allocated in the this Accountant before failing | |
269 | * an allocation. | |
270 | * | |
271 | * @param newLimit The limit in bytes. | |
272 | */ | |
273 | public void setLimit(long newLimit) { | |
274 | allocationLimit.set(newLimit); | |
275 | } | |
276 | ||
277 | /** | |
278 | * Return the current amount of allocated memory that this Accountant is managing accounting | |
279 | * for. Note this does not | |
280 | * include reservation memory that hasn't been allocated. | |
281 | * | |
282 | * @return Currently allocate memory in bytes. | |
283 | */ | |
284 | public long getAllocatedMemory() { | |
285 | return locallyHeldMemory.get(); | |
286 | } | |
287 | ||
288 | /** | |
289 | * The peak memory allocated by this Accountant. | |
290 | * | |
291 | * @return The peak allocated memory in bytes. | |
292 | */ | |
293 | public long getPeakMemoryAllocation() { | |
294 | return peakAllocation.get(); | |
295 | } | |
296 | ||
297 | public long getHeadroom() { | |
298 | long localHeadroom = allocationLimit.get() - locallyHeldMemory.get(); | |
299 | if (parent == null) { | |
300 | return localHeadroom; | |
301 | } | |
302 | ||
303 | // Amount of reserved memory left on top of what parent has | |
304 | long reservedHeadroom = Math.max(0, reservation - locallyHeldMemory.get()); | |
305 | return Math.min(localHeadroom, parent.getHeadroom() + reservedHeadroom); | |
306 | } | |
307 | ||
308 | } |