]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/java/memory/memory-core/src/main/java/org/apache/arrow/memory/Accountant.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / memory / memory-core / src / main / java / org / apache / arrow / memory / Accountant.java
CommitLineData
1d09f67e
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.arrow.memory;
19
20import java.util.concurrent.atomic.AtomicLong;
21
22import javax.annotation.concurrent.ThreadSafe;
23
24import org.apache.arrow.util.Preconditions;
25
26/**
27 * Provides a concurrent way to manage account for memory usage without locking. Used as basis
28 * for Allocators. All
29 * operations are threadsafe (except for close).
30 */
31@ThreadSafe
32class Accountant implements AutoCloseable {
33
34 /**
35 * The parent allocator.
36 */
37 protected final Accountant parent;
38
39 private final String name;
40
41 /**
42 * The amount of memory reserved for this allocator. Releases below this amount of memory will
43 * not be returned to the
44 * parent Accountant until this Accountant is closed.
45 */
46 protected final long reservation;
47
48 private final AtomicLong peakAllocation = new AtomicLong();
49
50 /**
51 * Maximum local memory that can be held. This can be externally updated. Changing it won't
52 * cause past memory to
53 * change but will change responses to future allocation efforts
54 */
55 private final AtomicLong allocationLimit = new AtomicLong();
56
57 /**
58 * Currently allocated amount of memory.
59 */
60 private final AtomicLong locallyHeldMemory = new AtomicLong();
61
62 public Accountant(Accountant parent, String name, long reservation, long maxAllocation) {
63 Preconditions.checkNotNull(name, "name must not be null");
64 Preconditions.checkArgument(reservation >= 0, "The initial reservation size must be non-negative.");
65 Preconditions.checkArgument(maxAllocation >= 0, "The maximum allocation limit must be non-negative.");
66 Preconditions.checkArgument(reservation <= maxAllocation,
67 "The initial reservation size must be <= the maximum allocation.");
68 Preconditions.checkArgument(reservation == 0 || parent != null, "The root accountant can't reserve memory.");
69
70 this.parent = parent;
71 this.name = name;
72 this.reservation = reservation;
73 this.allocationLimit.set(maxAllocation);
74
75 if (reservation != 0) {
76 // we will allocate a reservation from our parent.
77 final AllocationOutcome outcome = parent.allocateBytes(reservation);
78 if (!outcome.isOk()) {
79 throw new OutOfMemoryException(String.format(
80 "Failure trying to allocate initial reservation for Allocator. " +
81 "Attempted to allocate %d bytes.", reservation), outcome.getDetails());
82 }
83 }
84 }
85
86 /**
87 * Attempt to allocate the requested amount of memory. Either completely succeeds or completely
88 * fails. If it fails, no changes are made to accounting.
89 *
90 * @param size The amount of memory to reserve in bytes.
91 * @return the status and details of allocation at each allocator in the chain.
92 */
93 AllocationOutcome allocateBytes(long size) {
94 AllocationOutcome.Status status = allocateBytesInternal(size);
95 if (status.isOk()) {
96 return AllocationOutcome.SUCCESS_INSTANCE;
97 } else {
98 // Try again, but with details this time.
99 // Populating details only on failures avoids performance overhead in the common case (success case).
100 AllocationOutcomeDetails details = new AllocationOutcomeDetails();
101 status = allocateBytesInternal(size, details);
102 return new AllocationOutcome(status, details);
103 }
104 }
105
106 private AllocationOutcome.Status allocateBytesInternal(long size, AllocationOutcomeDetails details) {
107 final AllocationOutcome.Status status = allocate(size,
108 true /*incomingUpdatePeek*/, false /*forceAllocation*/, details);
109 if (!status.isOk()) {
110 releaseBytes(size);
111 }
112 return status;
113 }
114
115 private AllocationOutcome.Status allocateBytesInternal(long size) {
116 return allocateBytesInternal(size, null /*details*/);
117 }
118
119 private void updatePeak() {
120 final long currentMemory = locallyHeldMemory.get();
121 while (true) {
122
123 final long previousPeak = peakAllocation.get();
124 if (currentMemory > previousPeak) {
125 if (!peakAllocation.compareAndSet(previousPeak, currentMemory)) {
126 // peak allocation changed underneath us. try again.
127 continue;
128 }
129 }
130
131 // we either succeeded to set peak allocation or we weren't above the previous peak, exit.
132 return;
133 }
134 }
135
136
137 /**
138 * Increase the accounting. Returns whether the allocation fit within limits.
139 *
140 * @param size to increase
141 * @return Whether the allocation fit within limits.
142 */
143 public boolean forceAllocate(long size) {
144 final AllocationOutcome.Status outcome = allocate(size, true, true, null);
145 return outcome.isOk();
146 }
147
148 /**
149 * Internal method for allocation. This takes a forced approach to allocation to ensure that we
150 * manage reservation
151 * boundary issues consistently. Allocation is always done through the entire tree. The two
152 * options that we influence
153 * are whether the allocation should be forced and whether or not the peak memory allocation
154 * should be updated. If at
155 * some point during allocation escalation we determine that the allocation is no longer
156 * possible, we will continue to
157 * do a complete and consistent allocation but we will stop updating the peak allocation. We do
158 * this because we know
159 * that we will be directly unwinding this allocation (and thus never actually making the
160 * allocation). If force
161 * allocation is passed, then we continue to update the peak limits since we now know that this
162 * allocation will occur
163 * despite our moving past one or more limits.
164 *
165 * @param size The size of the allocation.
166 * @param incomingUpdatePeak Whether we should update the local peak for this allocation.
167 * @param forceAllocation Whether we should force the allocation.
168 * @return The outcome of the allocation.
169 */
170 private AllocationOutcome.Status allocate(final long size, final boolean incomingUpdatePeak,
171 final boolean forceAllocation, AllocationOutcomeDetails details) {
172 final long newLocal = locallyHeldMemory.addAndGet(size);
173 final long beyondReservation = newLocal - reservation;
174 final boolean beyondLimit = newLocal > allocationLimit.get();
175 final boolean updatePeak = forceAllocation || (incomingUpdatePeak && !beyondLimit);
176
177 if (details != null) {
178 // Add details if required (used in exceptions and debugging).
179 boolean allocationFailed = true;
180 long allocatedLocal = 0;
181 if (!beyondLimit) {
182 allocatedLocal = size - Math.min(beyondReservation, size);
183 allocationFailed = false;
184 }
185 details.pushEntry(this, newLocal - size, size, allocatedLocal, allocationFailed);
186 }
187
188 AllocationOutcome.Status parentOutcome = AllocationOutcome.Status.SUCCESS;
189 if (beyondReservation > 0 && parent != null) {
190 // we need to get memory from our parent.
191 final long parentRequest = Math.min(beyondReservation, size);
192 parentOutcome = parent.allocate(parentRequest, updatePeak, forceAllocation, details);
193 }
194
195 final AllocationOutcome.Status finalOutcome;
196 if (beyondLimit) {
197 finalOutcome = AllocationOutcome.Status.FAILED_LOCAL;
198 } else {
199 finalOutcome = parentOutcome.isOk() ? AllocationOutcome.Status.SUCCESS
200 : AllocationOutcome.Status.FAILED_PARENT;
201 }
202
203 if (updatePeak) {
204 updatePeak();
205 }
206
207 return finalOutcome;
208 }
209
210 public void releaseBytes(long size) {
211 // reduce local memory. all memory released above reservation should be released up the tree.
212 final long newSize = locallyHeldMemory.addAndGet(-size);
213
214 Preconditions.checkArgument(newSize >= 0, "Accounted size went negative.");
215
216 final long originalSize = newSize + size;
217 if (originalSize > reservation && parent != null) {
218 // we deallocated memory that we should release to our parent.
219 final long possibleAmountToReleaseToParent = originalSize - reservation;
220 final long actualToReleaseToParent = Math.min(size, possibleAmountToReleaseToParent);
221 parent.releaseBytes(actualToReleaseToParent);
222 }
223 }
224
225 public boolean isOverLimit() {
226 return getAllocatedMemory() > getLimit() || (parent != null && parent.isOverLimit());
227 }
228
229 /**
230 * Close this Accountant. This will release any reservation bytes back to a parent Accountant.
231 */
232 @Override
233 public void close() {
234 // return memory reservation to parent allocator.
235 if (parent != null) {
236 parent.releaseBytes(reservation);
237 }
238 }
239
240 /**
241 * Return the name of the accountant.
242 *
243 * @return name of accountant
244 */
245 public String getName() {
246 return name;
247 }
248
249 /**
250 * Return the current limit of this Accountant.
251 *
252 * @return Limit in bytes.
253 */
254 public long getLimit() {
255 return allocationLimit.get();
256 }
257
258 /**
259 * Return the initial reservation.
260 *
261 * @return reservation in bytes.
262 */
263 public long getInitReservation() {
264 return reservation;
265 }
266
267 /**
268 * Set the maximum amount of memory that can be allocated in the this Accountant before failing
269 * an allocation.
270 *
271 * @param newLimit The limit in bytes.
272 */
273 public void setLimit(long newLimit) {
274 allocationLimit.set(newLimit);
275 }
276
277 /**
278 * Return the current amount of allocated memory that this Accountant is managing accounting
279 * for. Note this does not
280 * include reservation memory that hasn't been allocated.
281 *
282 * @return Currently allocate memory in bytes.
283 */
284 public long getAllocatedMemory() {
285 return locallyHeldMemory.get();
286 }
287
288 /**
289 * The peak memory allocated by this Accountant.
290 *
291 * @return The peak allocated memory in bytes.
292 */
293 public long getPeakMemoryAllocation() {
294 return peakAllocation.get();
295 }
296
297 public long getHeadroom() {
298 long localHeadroom = allocationLimit.get() - locallyHeldMemory.get();
299 if (parent == null) {
300 return localHeadroom;
301 }
302
303 // Amount of reserved memory left on top of what parent has
304 long reservedHeadroom = Math.max(0, reservation - locallyHeldMemory.get());
305 return Math.min(localHeadroom, parent.getHeadroom() + reservedHeadroom);
306 }
307
308}