]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/plasma/quota_aware_policy.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / plasma / quota_aware_policy.cc
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "plasma/quota_aware_policy.h"
19#include "plasma/common.h"
20#include "plasma/plasma_allocator.h"
21
22#include <algorithm>
23#include <memory>
24#include <sstream>
25
26namespace plasma {
27
28QuotaAwarePolicy::QuotaAwarePolicy(PlasmaStoreInfo* store_info, int64_t max_size)
29 : EvictionPolicy(store_info, max_size) {}
30
31bool QuotaAwarePolicy::HasQuota(Client* client, bool is_create) {
32 if (!is_create) {
33 return false; // no quota enforcement on read requests yet
34 }
35 return per_client_cache_.find(client) != per_client_cache_.end();
36}
37
38void QuotaAwarePolicy::ObjectCreated(const ObjectID& object_id, Client* client,
39 bool is_create) {
40 if (HasQuota(client, is_create)) {
41 per_client_cache_[client]->Add(object_id, GetObjectSize(object_id));
42 owned_by_client_[object_id] = client;
43 } else {
44 EvictionPolicy::ObjectCreated(object_id, client, is_create);
45 }
46}
47
48bool QuotaAwarePolicy::SetClientQuota(Client* client, int64_t output_memory_quota) {
49 if (per_client_cache_.find(client) != per_client_cache_.end()) {
50 ARROW_LOG(WARNING) << "Cannot change the client quota once set";
51 return false;
52 }
53
54 if (cache_.Capacity() - output_memory_quota <
55 cache_.OriginalCapacity() * kGlobalLruReserveFraction) {
56 ARROW_LOG(WARNING) << "Not enough memory to set client quota: " << DebugString();
57 return false;
58 }
59
60 // those objects will be lazily evicted on the next call
61 cache_.AdjustCapacity(-output_memory_quota);
62 per_client_cache_[client] =
63 std::unique_ptr<LRUCache>(new LRUCache(client->name, output_memory_quota));
64 return true;
65}
66
67bool QuotaAwarePolicy::EnforcePerClientQuota(Client* client, int64_t size, bool is_create,
68 std::vector<ObjectID>* objects_to_evict) {
69 if (!HasQuota(client, is_create)) {
70 return true;
71 }
72
73 auto& client_cache = per_client_cache_[client];
74 if (size > client_cache->Capacity()) {
75 ARROW_LOG(WARNING) << "object too large (" << size
76 << " bytes) to fit in client quota " << client_cache->Capacity()
77 << " " << DebugString();
78 return false;
79 }
80
81 if (client_cache->RemainingCapacity() >= size) {
82 return true;
83 }
84
85 int64_t space_to_free = size - client_cache->RemainingCapacity();
86 if (space_to_free > 0) {
87 std::vector<ObjectID> candidates;
88 client_cache->ChooseObjectsToEvict(space_to_free, &candidates);
89 for (ObjectID& object_id : candidates) {
90 if (shared_for_read_.count(object_id)) {
91 // Pinned so we can't evict it, so demote the object to global LRU instead.
92 // We an do this by simply removing it from all data structures, so that
93 // the next EndObjectAccess() will add it back to global LRU.
94 shared_for_read_.erase(object_id);
95 } else {
96 objects_to_evict->push_back(object_id);
97 }
98 owned_by_client_.erase(object_id);
99 client_cache->Remove(object_id);
100 }
101 }
102 return true;
103}
104
105void QuotaAwarePolicy::BeginObjectAccess(const ObjectID& object_id) {
106 if (owned_by_client_.find(object_id) != owned_by_client_.end()) {
107 shared_for_read_.insert(object_id);
108 pinned_memory_bytes_ += GetObjectSize(object_id);
109 return;
110 }
111 EvictionPolicy::BeginObjectAccess(object_id);
112}
113
114void QuotaAwarePolicy::EndObjectAccess(const ObjectID& object_id) {
115 if (owned_by_client_.find(object_id) != owned_by_client_.end()) {
116 shared_for_read_.erase(object_id);
117 pinned_memory_bytes_ -= GetObjectSize(object_id);
118 return;
119 }
120 EvictionPolicy::EndObjectAccess(object_id);
121}
122
123void QuotaAwarePolicy::RemoveObject(const ObjectID& object_id) {
124 if (owned_by_client_.find(object_id) != owned_by_client_.end()) {
125 per_client_cache_[owned_by_client_[object_id]]->Remove(object_id);
126 owned_by_client_.erase(object_id);
127 shared_for_read_.erase(object_id);
128 return;
129 }
130 EvictionPolicy::RemoveObject(object_id);
131}
132
133void QuotaAwarePolicy::RefreshObjects(const std::vector<ObjectID>& object_ids) {
134 for (const auto& object_id : object_ids) {
135 if (owned_by_client_.find(object_id) != owned_by_client_.end()) {
136 int64_t size = per_client_cache_[owned_by_client_[object_id]]->Remove(object_id);
137 per_client_cache_[owned_by_client_[object_id]]->Add(object_id, size);
138 }
139 }
140 EvictionPolicy::RefreshObjects(object_ids);
141}
142
143void QuotaAwarePolicy::ClientDisconnected(Client* client) {
144 if (per_client_cache_.find(client) == per_client_cache_.end()) {
145 return;
146 }
147 // return capacity back to global LRU
148 cache_.AdjustCapacity(per_client_cache_[client]->Capacity());
149 // clean up any entries used to track this client's quota usage
150 per_client_cache_[client]->Foreach([this](const ObjectID& obj) {
151 if (!shared_for_read_.count(obj)) {
152 // only add it to the global LRU if we have it in pinned mode
153 // otherwise, EndObjectAccess will add it later
154 cache_.Add(obj, GetObjectSize(obj));
155 }
156 owned_by_client_.erase(obj);
157 shared_for_read_.erase(obj);
158 });
159 per_client_cache_.erase(client);
160}
161
162std::string QuotaAwarePolicy::DebugString() const {
163 std::stringstream result;
164 result << "num clients with quota: " << per_client_cache_.size();
165 result << "\nquota map size: " << owned_by_client_.size();
166 result << "\npinned quota map size: " << shared_for_read_.size();
167 result << "\nallocated bytes: " << PlasmaAllocator::Allocated();
168 result << "\nallocation limit: " << PlasmaAllocator::GetFootprintLimit();
169 result << "\npinned bytes: " << pinned_memory_bytes_;
170 result << cache_.DebugString();
171 for (const auto& pair : per_client_cache_) {
172 result << pair.second->DebugString();
173 }
174 return result.str();
175}
176
177} // namespace plasma