]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #include "plasma/quota_aware_policy.h" | |
19 | #include "plasma/common.h" | |
20 | #include "plasma/plasma_allocator.h" | |
21 | ||
22 | #include <algorithm> | |
23 | #include <memory> | |
24 | #include <sstream> | |
25 | ||
26 | namespace plasma { | |
27 | ||
28 | QuotaAwarePolicy::QuotaAwarePolicy(PlasmaStoreInfo* store_info, int64_t max_size) | |
29 | : EvictionPolicy(store_info, max_size) {} | |
30 | ||
31 | bool QuotaAwarePolicy::HasQuota(Client* client, bool is_create) { | |
32 | if (!is_create) { | |
33 | return false; // no quota enforcement on read requests yet | |
34 | } | |
35 | return per_client_cache_.find(client) != per_client_cache_.end(); | |
36 | } | |
37 | ||
38 | void QuotaAwarePolicy::ObjectCreated(const ObjectID& object_id, Client* client, | |
39 | bool is_create) { | |
40 | if (HasQuota(client, is_create)) { | |
41 | per_client_cache_[client]->Add(object_id, GetObjectSize(object_id)); | |
42 | owned_by_client_[object_id] = client; | |
43 | } else { | |
44 | EvictionPolicy::ObjectCreated(object_id, client, is_create); | |
45 | } | |
46 | } | |
47 | ||
48 | bool QuotaAwarePolicy::SetClientQuota(Client* client, int64_t output_memory_quota) { | |
49 | if (per_client_cache_.find(client) != per_client_cache_.end()) { | |
50 | ARROW_LOG(WARNING) << "Cannot change the client quota once set"; | |
51 | return false; | |
52 | } | |
53 | ||
54 | if (cache_.Capacity() - output_memory_quota < | |
55 | cache_.OriginalCapacity() * kGlobalLruReserveFraction) { | |
56 | ARROW_LOG(WARNING) << "Not enough memory to set client quota: " << DebugString(); | |
57 | return false; | |
58 | } | |
59 | ||
60 | // those objects will be lazily evicted on the next call | |
61 | cache_.AdjustCapacity(-output_memory_quota); | |
62 | per_client_cache_[client] = | |
63 | std::unique_ptr<LRUCache>(new LRUCache(client->name, output_memory_quota)); | |
64 | return true; | |
65 | } | |
66 | ||
67 | bool QuotaAwarePolicy::EnforcePerClientQuota(Client* client, int64_t size, bool is_create, | |
68 | std::vector<ObjectID>* objects_to_evict) { | |
69 | if (!HasQuota(client, is_create)) { | |
70 | return true; | |
71 | } | |
72 | ||
73 | auto& client_cache = per_client_cache_[client]; | |
74 | if (size > client_cache->Capacity()) { | |
75 | ARROW_LOG(WARNING) << "object too large (" << size | |
76 | << " bytes) to fit in client quota " << client_cache->Capacity() | |
77 | << " " << DebugString(); | |
78 | return false; | |
79 | } | |
80 | ||
81 | if (client_cache->RemainingCapacity() >= size) { | |
82 | return true; | |
83 | } | |
84 | ||
85 | int64_t space_to_free = size - client_cache->RemainingCapacity(); | |
86 | if (space_to_free > 0) { | |
87 | std::vector<ObjectID> candidates; | |
88 | client_cache->ChooseObjectsToEvict(space_to_free, &candidates); | |
89 | for (ObjectID& object_id : candidates) { | |
90 | if (shared_for_read_.count(object_id)) { | |
91 | // Pinned so we can't evict it, so demote the object to global LRU instead. | |
92 | // We an do this by simply removing it from all data structures, so that | |
93 | // the next EndObjectAccess() will add it back to global LRU. | |
94 | shared_for_read_.erase(object_id); | |
95 | } else { | |
96 | objects_to_evict->push_back(object_id); | |
97 | } | |
98 | owned_by_client_.erase(object_id); | |
99 | client_cache->Remove(object_id); | |
100 | } | |
101 | } | |
102 | return true; | |
103 | } | |
104 | ||
105 | void QuotaAwarePolicy::BeginObjectAccess(const ObjectID& object_id) { | |
106 | if (owned_by_client_.find(object_id) != owned_by_client_.end()) { | |
107 | shared_for_read_.insert(object_id); | |
108 | pinned_memory_bytes_ += GetObjectSize(object_id); | |
109 | return; | |
110 | } | |
111 | EvictionPolicy::BeginObjectAccess(object_id); | |
112 | } | |
113 | ||
114 | void QuotaAwarePolicy::EndObjectAccess(const ObjectID& object_id) { | |
115 | if (owned_by_client_.find(object_id) != owned_by_client_.end()) { | |
116 | shared_for_read_.erase(object_id); | |
117 | pinned_memory_bytes_ -= GetObjectSize(object_id); | |
118 | return; | |
119 | } | |
120 | EvictionPolicy::EndObjectAccess(object_id); | |
121 | } | |
122 | ||
123 | void QuotaAwarePolicy::RemoveObject(const ObjectID& object_id) { | |
124 | if (owned_by_client_.find(object_id) != owned_by_client_.end()) { | |
125 | per_client_cache_[owned_by_client_[object_id]]->Remove(object_id); | |
126 | owned_by_client_.erase(object_id); | |
127 | shared_for_read_.erase(object_id); | |
128 | return; | |
129 | } | |
130 | EvictionPolicy::RemoveObject(object_id); | |
131 | } | |
132 | ||
133 | void QuotaAwarePolicy::RefreshObjects(const std::vector<ObjectID>& object_ids) { | |
134 | for (const auto& object_id : object_ids) { | |
135 | if (owned_by_client_.find(object_id) != owned_by_client_.end()) { | |
136 | int64_t size = per_client_cache_[owned_by_client_[object_id]]->Remove(object_id); | |
137 | per_client_cache_[owned_by_client_[object_id]]->Add(object_id, size); | |
138 | } | |
139 | } | |
140 | EvictionPolicy::RefreshObjects(object_ids); | |
141 | } | |
142 | ||
143 | void QuotaAwarePolicy::ClientDisconnected(Client* client) { | |
144 | if (per_client_cache_.find(client) == per_client_cache_.end()) { | |
145 | return; | |
146 | } | |
147 | // return capacity back to global LRU | |
148 | cache_.AdjustCapacity(per_client_cache_[client]->Capacity()); | |
149 | // clean up any entries used to track this client's quota usage | |
150 | per_client_cache_[client]->Foreach([this](const ObjectID& obj) { | |
151 | if (!shared_for_read_.count(obj)) { | |
152 | // only add it to the global LRU if we have it in pinned mode | |
153 | // otherwise, EndObjectAccess will add it later | |
154 | cache_.Add(obj, GetObjectSize(obj)); | |
155 | } | |
156 | owned_by_client_.erase(obj); | |
157 | shared_for_read_.erase(obj); | |
158 | }); | |
159 | per_client_cache_.erase(client); | |
160 | } | |
161 | ||
162 | std::string QuotaAwarePolicy::DebugString() const { | |
163 | std::stringstream result; | |
164 | result << "num clients with quota: " << per_client_cache_.size(); | |
165 | result << "\nquota map size: " << owned_by_client_.size(); | |
166 | result << "\npinned quota map size: " << shared_for_read_.size(); | |
167 | result << "\nallocated bytes: " << PlasmaAllocator::Allocated(); | |
168 | result << "\nallocation limit: " << PlasmaAllocator::GetFootprintLimit(); | |
169 | result << "\npinned bytes: " << pinned_memory_bytes_; | |
170 | result << cache_.DebugString(); | |
171 | for (const auto& pair : per_client_cache_) { | |
172 | result << pair.second->DebugString(); | |
173 | } | |
174 | return result.str(); | |
175 | } | |
176 | ||
177 | } // namespace plasma |