]>
Commit | Line | Data |
---|---|---|
b32b8144 FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
11fdf7f2 | 3 | |
b32b8144 FG |
4 | /* |
5 | * Ceph - scalable distributed file system | |
6 | * | |
7 | * Copyright (C) 2017 Red Hat, Inc | |
8 | * | |
9 | * Author: Casey Bodley <cbodley@redhat.com> | |
10 | * | |
11 | * This is free software; you can redistribute it and/or | |
12 | * modify it under the terms of the GNU Lesser General Public | |
13 | * License version 2.1, as published by the Free Software | |
14 | * Foundation. See file COPYING. | |
15 | */ | |
16 | ||
17 | #include <mutex> | |
18 | #include <boost/circular_buffer.hpp> | |
19 | #include <boost/container/flat_map.hpp> | |
20 | ||
f64942e4 | 21 | #include "include/scope_guard.h" |
b32b8144 FG |
22 | #include "common/bounded_key_counter.h" |
23 | #include "common/errno.h" | |
24 | #include "rgw_sync_log_trim.h" | |
25 | #include "rgw_cr_rados.h" | |
26 | #include "rgw_cr_rest.h" | |
27 | #include "rgw_data_sync.h" | |
28 | #include "rgw_metadata.h" | |
29 | #include "rgw_rados.h" | |
11fdf7f2 | 30 | #include "rgw_zone.h" |
b32b8144 FG |
31 | #include "rgw_sync.h" |
32 | ||
11fdf7f2 TL |
33 | #include "services/svc_zone.h" |
34 | ||
b32b8144 | 35 | #include <boost/asio/yield.hpp> |
11fdf7f2 | 36 | #include "include/ceph_assert.h" |
b32b8144 FG |
37 | |
38 | #define dout_subsys ceph_subsys_rgw | |
39 | ||
40 | #undef dout_prefix | |
41 | #define dout_prefix (*_dout << "trim: ") | |
42 | ||
43 | using rgw::BucketTrimConfig; | |
44 | using BucketChangeCounter = BoundedKeyCounter<std::string, int>; | |
45 | ||
46 | const std::string rgw::BucketTrimStatus::oid = "bilog.trim"; | |
47 | using rgw::BucketTrimStatus; | |
48 | ||
49 | ||
50 | // watch/notify api for gateways to coordinate about which buckets to trim | |
51 | enum TrimNotifyType { | |
52 | NotifyTrimCounters = 0, | |
53 | NotifyTrimComplete, | |
54 | }; | |
55 | WRITE_RAW_ENCODER(TrimNotifyType); | |
56 | ||
57 | struct TrimNotifyHandler { | |
58 | virtual ~TrimNotifyHandler() = default; | |
59 | ||
11fdf7f2 | 60 | virtual void handle(bufferlist::const_iterator& input, bufferlist& output) = 0; |
b32b8144 FG |
61 | }; |
62 | ||
63 | /// api to share the bucket trim counters between gateways in the same zone. | |
64 | /// each gateway will process different datalog shards, so the gateway that runs | |
65 | /// the trim process needs to accumulate their counters | |
66 | struct TrimCounters { | |
67 | /// counter for a single bucket | |
68 | struct BucketCounter { | |
69 | std::string bucket; //< bucket instance metadata key | |
70 | int count{0}; | |
71 | ||
72 | BucketCounter() = default; | |
73 | BucketCounter(const std::string& bucket, int count) | |
74 | : bucket(bucket), count(count) {} | |
75 | ||
76 | void encode(bufferlist& bl) const; | |
11fdf7f2 | 77 | void decode(bufferlist::const_iterator& p); |
b32b8144 FG |
78 | }; |
79 | using Vector = std::vector<BucketCounter>; | |
80 | ||
81 | /// request bucket trim counters from peer gateways | |
82 | struct Request { | |
83 | uint16_t max_buckets; //< maximum number of bucket counters to return | |
84 | ||
85 | void encode(bufferlist& bl) const; | |
11fdf7f2 | 86 | void decode(bufferlist::const_iterator& p); |
b32b8144 FG |
87 | }; |
88 | ||
89 | /// return the current bucket trim counters | |
90 | struct Response { | |
91 | Vector bucket_counters; | |
92 | ||
93 | void encode(bufferlist& bl) const; | |
11fdf7f2 | 94 | void decode(bufferlist::const_iterator& p); |
b32b8144 FG |
95 | }; |
96 | ||
97 | /// server interface to query the hottest buckets | |
98 | struct Server { | |
99 | virtual ~Server() = default; | |
100 | ||
101 | virtual void get_bucket_counters(int count, Vector& counters) = 0; | |
102 | virtual void reset_bucket_counters() = 0; | |
103 | }; | |
104 | ||
105 | /// notify handler | |
106 | class Handler : public TrimNotifyHandler { | |
107 | Server *const server; | |
108 | public: | |
11fdf7f2 | 109 | explicit Handler(Server *server) : server(server) {} |
b32b8144 | 110 | |
11fdf7f2 | 111 | void handle(bufferlist::const_iterator& input, bufferlist& output) override; |
b32b8144 FG |
112 | }; |
113 | }; | |
114 | std::ostream& operator<<(std::ostream& out, const TrimCounters::BucketCounter& rhs) | |
115 | { | |
116 | return out << rhs.bucket << ":" << rhs.count; | |
117 | } | |
118 | ||
119 | void TrimCounters::BucketCounter::encode(bufferlist& bl) const | |
120 | { | |
11fdf7f2 | 121 | using ceph::encode; |
b32b8144 | 122 | // no versioning to save space |
11fdf7f2 TL |
123 | encode(bucket, bl); |
124 | encode(count, bl); | |
b32b8144 | 125 | } |
11fdf7f2 | 126 | void TrimCounters::BucketCounter::decode(bufferlist::const_iterator& p) |
b32b8144 | 127 | { |
11fdf7f2 TL |
128 | using ceph::decode; |
129 | decode(bucket, p); | |
130 | decode(count, p); | |
b32b8144 FG |
131 | } |
132 | WRITE_CLASS_ENCODER(TrimCounters::BucketCounter); | |
133 | ||
134 | void TrimCounters::Request::encode(bufferlist& bl) const | |
135 | { | |
136 | ENCODE_START(1, 1, bl); | |
11fdf7f2 | 137 | encode(max_buckets, bl); |
b32b8144 FG |
138 | ENCODE_FINISH(bl); |
139 | } | |
11fdf7f2 | 140 | void TrimCounters::Request::decode(bufferlist::const_iterator& p) |
b32b8144 FG |
141 | { |
142 | DECODE_START(1, p); | |
11fdf7f2 | 143 | decode(max_buckets, p); |
b32b8144 FG |
144 | DECODE_FINISH(p); |
145 | } | |
146 | WRITE_CLASS_ENCODER(TrimCounters::Request); | |
147 | ||
148 | void TrimCounters::Response::encode(bufferlist& bl) const | |
149 | { | |
150 | ENCODE_START(1, 1, bl); | |
11fdf7f2 | 151 | encode(bucket_counters, bl); |
b32b8144 FG |
152 | ENCODE_FINISH(bl); |
153 | } | |
11fdf7f2 | 154 | void TrimCounters::Response::decode(bufferlist::const_iterator& p) |
b32b8144 FG |
155 | { |
156 | DECODE_START(1, p); | |
11fdf7f2 | 157 | decode(bucket_counters, p); |
b32b8144 FG |
158 | DECODE_FINISH(p); |
159 | } | |
160 | WRITE_CLASS_ENCODER(TrimCounters::Response); | |
161 | ||
11fdf7f2 | 162 | void TrimCounters::Handler::handle(bufferlist::const_iterator& input, |
b32b8144 FG |
163 | bufferlist& output) |
164 | { | |
165 | Request request; | |
11fdf7f2 | 166 | decode(request, input); |
b32b8144 FG |
167 | auto count = std::min<uint16_t>(request.max_buckets, 128); |
168 | ||
169 | Response response; | |
170 | server->get_bucket_counters(count, response.bucket_counters); | |
11fdf7f2 | 171 | encode(response, output); |
b32b8144 FG |
172 | } |
173 | ||
174 | /// api to notify peer gateways that trim has completed and their bucket change | |
175 | /// counters can be reset | |
176 | struct TrimComplete { | |
177 | struct Request { | |
178 | void encode(bufferlist& bl) const; | |
11fdf7f2 | 179 | void decode(bufferlist::const_iterator& p); |
b32b8144 FG |
180 | }; |
181 | struct Response { | |
182 | void encode(bufferlist& bl) const; | |
11fdf7f2 | 183 | void decode(bufferlist::const_iterator& p); |
b32b8144 FG |
184 | }; |
185 | ||
186 | /// server interface to reset bucket counters | |
187 | using Server = TrimCounters::Server; | |
188 | ||
189 | /// notify handler | |
190 | class Handler : public TrimNotifyHandler { | |
191 | Server *const server; | |
192 | public: | |
11fdf7f2 | 193 | explicit Handler(Server *server) : server(server) {} |
b32b8144 | 194 | |
11fdf7f2 | 195 | void handle(bufferlist::const_iterator& input, bufferlist& output) override; |
b32b8144 FG |
196 | }; |
197 | }; | |
198 | ||
199 | void TrimComplete::Request::encode(bufferlist& bl) const | |
200 | { | |
201 | ENCODE_START(1, 1, bl); | |
202 | ENCODE_FINISH(bl); | |
203 | } | |
11fdf7f2 | 204 | void TrimComplete::Request::decode(bufferlist::const_iterator& p) |
b32b8144 FG |
205 | { |
206 | DECODE_START(1, p); | |
207 | DECODE_FINISH(p); | |
208 | } | |
209 | WRITE_CLASS_ENCODER(TrimComplete::Request); | |
210 | ||
211 | void TrimComplete::Response::encode(bufferlist& bl) const | |
212 | { | |
213 | ENCODE_START(1, 1, bl); | |
214 | ENCODE_FINISH(bl); | |
215 | } | |
11fdf7f2 | 216 | void TrimComplete::Response::decode(bufferlist::const_iterator& p) |
b32b8144 FG |
217 | { |
218 | DECODE_START(1, p); | |
219 | DECODE_FINISH(p); | |
220 | } | |
221 | WRITE_CLASS_ENCODER(TrimComplete::Response); | |
222 | ||
11fdf7f2 | 223 | void TrimComplete::Handler::handle(bufferlist::const_iterator& input, |
b32b8144 FG |
224 | bufferlist& output) |
225 | { | |
226 | Request request; | |
11fdf7f2 | 227 | decode(request, input); |
b32b8144 FG |
228 | |
229 | server->reset_bucket_counters(); | |
230 | ||
231 | Response response; | |
11fdf7f2 | 232 | encode(response, output); |
b32b8144 FG |
233 | } |
234 | ||
235 | ||
236 | /// rados watcher for bucket trim notifications | |
237 | class BucketTrimWatcher : public librados::WatchCtx2 { | |
238 | RGWRados *const store; | |
239 | const rgw_raw_obj& obj; | |
240 | rgw_rados_ref ref; | |
241 | uint64_t handle{0}; | |
242 | ||
243 | using HandlerPtr = std::unique_ptr<TrimNotifyHandler>; | |
244 | boost::container::flat_map<TrimNotifyType, HandlerPtr> handlers; | |
245 | ||
246 | public: | |
247 | BucketTrimWatcher(RGWRados *store, const rgw_raw_obj& obj, | |
248 | TrimCounters::Server *counters) | |
249 | : store(store), obj(obj) { | |
250 | handlers.emplace(NotifyTrimCounters, new TrimCounters::Handler(counters)); | |
251 | handlers.emplace(NotifyTrimComplete, new TrimComplete::Handler(counters)); | |
252 | } | |
253 | ||
254 | ~BucketTrimWatcher() { | |
255 | stop(); | |
256 | } | |
257 | ||
258 | int start() { | |
259 | int r = store->get_raw_obj_ref(obj, &ref); | |
260 | if (r < 0) { | |
261 | return r; | |
262 | } | |
263 | ||
264 | // register a watch on the realm's control object | |
11fdf7f2 | 265 | r = ref.ioctx.watch2(ref.obj.oid, &handle, this); |
b32b8144 FG |
266 | if (r == -ENOENT) { |
267 | constexpr bool exclusive = true; | |
11fdf7f2 | 268 | r = ref.ioctx.create(ref.obj.oid, exclusive); |
b32b8144 | 269 | if (r == -EEXIST || r == 0) { |
11fdf7f2 | 270 | r = ref.ioctx.watch2(ref.obj.oid, &handle, this); |
b32b8144 FG |
271 | } |
272 | } | |
273 | if (r < 0) { | |
11fdf7f2 | 274 | lderr(store->ctx()) << "Failed to watch " << ref.obj |
b32b8144 FG |
275 | << " with " << cpp_strerror(-r) << dendl; |
276 | ref.ioctx.close(); | |
277 | return r; | |
278 | } | |
279 | ||
11fdf7f2 | 280 | ldout(store->ctx(), 10) << "Watching " << ref.obj.oid << dendl; |
b32b8144 FG |
281 | return 0; |
282 | } | |
283 | ||
284 | int restart() { | |
285 | int r = ref.ioctx.unwatch2(handle); | |
286 | if (r < 0) { | |
11fdf7f2 | 287 | lderr(store->ctx()) << "Failed to unwatch on " << ref.obj |
b32b8144 FG |
288 | << " with " << cpp_strerror(-r) << dendl; |
289 | } | |
11fdf7f2 | 290 | r = ref.ioctx.watch2(ref.obj.oid, &handle, this); |
b32b8144 | 291 | if (r < 0) { |
11fdf7f2 | 292 | lderr(store->ctx()) << "Failed to restart watch on " << ref.obj |
b32b8144 FG |
293 | << " with " << cpp_strerror(-r) << dendl; |
294 | ref.ioctx.close(); | |
295 | } | |
296 | return r; | |
297 | } | |
298 | ||
299 | void stop() { | |
300 | if (handle) { | |
301 | ref.ioctx.unwatch2(handle); | |
302 | ref.ioctx.close(); | |
303 | } | |
304 | } | |
305 | ||
306 | /// respond to bucket trim notifications | |
307 | void handle_notify(uint64_t notify_id, uint64_t cookie, | |
308 | uint64_t notifier_id, bufferlist& bl) override { | |
309 | if (cookie != handle) { | |
310 | return; | |
311 | } | |
312 | bufferlist reply; | |
313 | try { | |
11fdf7f2 | 314 | auto p = bl.cbegin(); |
b32b8144 | 315 | TrimNotifyType type; |
11fdf7f2 | 316 | decode(type, p); |
b32b8144 FG |
317 | |
318 | auto handler = handlers.find(type); | |
319 | if (handler != handlers.end()) { | |
320 | handler->second->handle(p, reply); | |
321 | } else { | |
322 | lderr(store->ctx()) << "no handler for notify type " << type << dendl; | |
323 | } | |
324 | } catch (const buffer::error& e) { | |
325 | lderr(store->ctx()) << "Failed to decode notification: " << e.what() << dendl; | |
326 | } | |
11fdf7f2 | 327 | ref.ioctx.notify_ack(ref.obj.oid, notify_id, cookie, reply); |
b32b8144 FG |
328 | } |
329 | ||
330 | /// reestablish the watch if it gets disconnected | |
331 | void handle_error(uint64_t cookie, int err) override { | |
332 | if (cookie != handle) { | |
333 | return; | |
334 | } | |
335 | if (err == -ENOTCONN) { | |
11fdf7f2 | 336 | ldout(store->ctx(), 4) << "Disconnected watch on " << ref.obj << dendl; |
b32b8144 FG |
337 | restart(); |
338 | } | |
339 | } | |
340 | }; | |
341 | ||
342 | ||
343 | /// Interface to communicate with the trim manager about completed operations | |
344 | struct BucketTrimObserver { | |
345 | virtual ~BucketTrimObserver() = default; | |
346 | ||
347 | virtual void on_bucket_trimmed(std::string&& bucket_instance) = 0; | |
348 | virtual bool trimmed_recently(const boost::string_view& bucket_instance) = 0; | |
349 | }; | |
350 | ||
351 | /// populate the status with the minimum stable marker of each shard | |
352 | template <typename Iter> | |
353 | int take_min_status(CephContext *cct, Iter first, Iter last, | |
354 | std::vector<std::string> *status) | |
355 | { | |
b32b8144 | 356 | for (auto peer = first; peer != last; ++peer) { |
eafe8130 | 357 | if (peer->size() != status->size()) { |
b32b8144 FG |
358 | // all peers must agree on the number of shards |
359 | return -EINVAL; | |
360 | } | |
361 | auto m = status->begin(); | |
362 | for (auto& shard : *peer) { | |
363 | auto& marker = *m++; | |
364 | // only consider incremental sync markers | |
365 | if (shard.state != rgw_bucket_shard_sync_info::StateIncrementalSync) { | |
366 | continue; | |
367 | } | |
368 | // always take the first marker, or any later marker that's smaller | |
369 | if (peer == first || marker > shard.inc_marker.position) { | |
370 | marker = std::move(shard.inc_marker.position); | |
371 | } | |
372 | } | |
373 | } | |
374 | return 0; | |
375 | } | |
376 | ||
377 | /// trim each bilog shard to the given marker, while limiting the number of | |
378 | /// concurrent requests | |
379 | class BucketTrimShardCollectCR : public RGWShardCollectCR { | |
380 | static constexpr int MAX_CONCURRENT_SHARDS = 16; | |
381 | RGWRados *const store; | |
382 | const RGWBucketInfo& bucket_info; | |
383 | const std::vector<std::string>& markers; //< shard markers to trim | |
384 | size_t i{0}; //< index of current shard marker | |
385 | public: | |
386 | BucketTrimShardCollectCR(RGWRados *store, const RGWBucketInfo& bucket_info, | |
387 | const std::vector<std::string>& markers) | |
388 | : RGWShardCollectCR(store->ctx(), MAX_CONCURRENT_SHARDS), | |
389 | store(store), bucket_info(bucket_info), markers(markers) | |
390 | {} | |
391 | bool spawn_next() override; | |
392 | }; | |
393 | ||
394 | bool BucketTrimShardCollectCR::spawn_next() | |
395 | { | |
396 | while (i < markers.size()) { | |
397 | const auto& marker = markers[i]; | |
398 | const auto shard_id = i++; | |
399 | ||
400 | // skip empty markers | |
401 | if (!marker.empty()) { | |
402 | ldout(cct, 10) << "trimming bilog shard " << shard_id | |
403 | << " of " << bucket_info.bucket << " at marker " << marker << dendl; | |
404 | spawn(new RGWRadosBILogTrimCR(store, bucket_info, shard_id, | |
405 | std::string{}, marker), | |
406 | false); | |
407 | return true; | |
408 | } | |
409 | } | |
410 | return false; | |
411 | } | |
412 | ||
413 | /// trim the bilog of all of the given bucket instance's shards | |
414 | class BucketTrimInstanceCR : public RGWCoroutine { | |
415 | RGWRados *const store; | |
416 | RGWHTTPManager *const http; | |
417 | BucketTrimObserver *const observer; | |
418 | std::string bucket_instance; | |
419 | const std::string& zone_id; //< my zone id | |
420 | RGWBucketInfo bucket_info; //< bucket instance info to locate bucket indices | |
11fdf7f2 | 421 | int child_ret = 0; |
b32b8144 FG |
422 | |
423 | using StatusShards = std::vector<rgw_bucket_shard_sync_info>; | |
424 | std::vector<StatusShards> peer_status; //< sync status for each peer | |
425 | std::vector<std::string> min_markers; //< min marker per shard | |
426 | ||
427 | public: | |
428 | BucketTrimInstanceCR(RGWRados *store, RGWHTTPManager *http, | |
429 | BucketTrimObserver *observer, | |
430 | const std::string& bucket_instance) | |
431 | : RGWCoroutine(store->ctx()), store(store), | |
432 | http(http), observer(observer), | |
433 | bucket_instance(bucket_instance), | |
11fdf7f2 | 434 | zone_id(store->svc.zone->get_zone().id), |
81eedcae | 435 | peer_status(store->svc.zone->get_zone_data_notify_to_map().size()) |
b32b8144 FG |
436 | {} |
437 | ||
438 | int operate() override; | |
439 | }; | |
440 | ||
441 | int BucketTrimInstanceCR::operate() | |
442 | { | |
443 | reenter(this) { | |
444 | ldout(cct, 4) << "starting trim on bucket=" << bucket_instance << dendl; | |
445 | ||
446 | // query peers for sync status | |
447 | set_status("fetching sync status from peers"); | |
448 | yield { | |
449 | // query data sync status from each sync peer | |
450 | rgw_http_param_pair params[] = { | |
451 | { "type", "bucket-index" }, | |
452 | { "status", nullptr }, | |
453 | { "bucket", bucket_instance.c_str() }, | |
454 | { "source-zone", zone_id.c_str() }, | |
455 | { nullptr, nullptr } | |
456 | }; | |
457 | ||
458 | auto p = peer_status.begin(); | |
81eedcae | 459 | for (auto& c : store->svc.zone->get_zone_data_notify_to_map()) { |
b32b8144 FG |
460 | using StatusCR = RGWReadRESTResourceCR<StatusShards>; |
461 | spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p), | |
462 | false); | |
463 | ++p; | |
464 | } | |
465 | // in parallel, read the local bucket instance info | |
466 | spawn(new RGWGetBucketInstanceInfoCR(store->get_async_rados(), store, | |
467 | bucket_instance, &bucket_info), | |
468 | false); | |
469 | } | |
470 | // wait for a response from each peer. all must respond to attempt trim | |
471 | while (num_spawned()) { | |
b32b8144 FG |
472 | yield wait_for_child(); |
473 | collect(&child_ret, nullptr); | |
474 | if (child_ret < 0) { | |
475 | drain_all(); | |
476 | return set_cr_error(child_ret); | |
477 | } | |
478 | } | |
479 | ||
eafe8130 TL |
480 | // initialize each shard with the maximum marker, which is only used when |
481 | // there are no peers syncing from us | |
482 | min_markers.assign(std::max(1u, bucket_info.num_shards), | |
483 | RGWSyncLogTrimCR::max_marker); | |
484 | ||
b32b8144 FG |
485 | // determine the minimum marker for each shard |
486 | retcode = take_min_status(cct, peer_status.begin(), peer_status.end(), | |
487 | &min_markers); | |
488 | if (retcode < 0) { | |
489 | ldout(cct, 4) << "failed to correlate bucket sync status from peers" << dendl; | |
490 | return set_cr_error(retcode); | |
491 | } | |
492 | ||
493 | // trim shards with a ShardCollectCR | |
494 | ldout(cct, 10) << "trimming bilogs for bucket=" << bucket_info.bucket | |
495 | << " markers=" << min_markers << ", shards=" << min_markers.size() << dendl; | |
496 | set_status("trimming bilog shards"); | |
497 | yield call(new BucketTrimShardCollectCR(store, bucket_info, min_markers)); | |
498 | // ENODATA just means there were no keys to trim | |
499 | if (retcode == -ENODATA) { | |
500 | retcode = 0; | |
501 | } | |
502 | if (retcode < 0) { | |
503 | ldout(cct, 4) << "failed to trim bilog shards: " | |
504 | << cpp_strerror(retcode) << dendl; | |
505 | return set_cr_error(retcode); | |
506 | } | |
507 | ||
508 | observer->on_bucket_trimmed(std::move(bucket_instance)); | |
509 | return set_cr_done(); | |
510 | } | |
511 | return 0; | |
512 | } | |
513 | ||
514 | /// trim each bucket instance while limiting the number of concurrent operations | |
515 | class BucketTrimInstanceCollectCR : public RGWShardCollectCR { | |
516 | RGWRados *const store; | |
517 | RGWHTTPManager *const http; | |
518 | BucketTrimObserver *const observer; | |
519 | std::vector<std::string>::const_iterator bucket; | |
520 | std::vector<std::string>::const_iterator end; | |
521 | public: | |
522 | BucketTrimInstanceCollectCR(RGWRados *store, RGWHTTPManager *http, | |
523 | BucketTrimObserver *observer, | |
524 | const std::vector<std::string>& buckets, | |
525 | int max_concurrent) | |
526 | : RGWShardCollectCR(store->ctx(), max_concurrent), | |
527 | store(store), http(http), observer(observer), | |
528 | bucket(buckets.begin()), end(buckets.end()) | |
529 | {} | |
530 | bool spawn_next() override; | |
531 | }; | |
532 | ||
533 | bool BucketTrimInstanceCollectCR::spawn_next() | |
534 | { | |
535 | if (bucket == end) { | |
536 | return false; | |
537 | } | |
538 | spawn(new BucketTrimInstanceCR(store, http, observer, *bucket), false); | |
539 | ++bucket; | |
540 | return true; | |
541 | } | |
542 | ||
543 | /// correlate the replies from each peer gateway into the given counter | |
544 | int accumulate_peer_counters(bufferlist& bl, BucketChangeCounter& counter) | |
545 | { | |
546 | counter.clear(); | |
547 | ||
548 | try { | |
549 | // decode notify responses | |
11fdf7f2 | 550 | auto p = bl.cbegin(); |
b32b8144 FG |
551 | std::map<std::pair<uint64_t, uint64_t>, bufferlist> replies; |
552 | std::set<std::pair<uint64_t, uint64_t>> timeouts; | |
11fdf7f2 TL |
553 | decode(replies, p); |
554 | decode(timeouts, p); | |
b32b8144 FG |
555 | |
556 | for (auto& peer : replies) { | |
11fdf7f2 | 557 | auto q = peer.second.cbegin(); |
b32b8144 | 558 | TrimCounters::Response response; |
11fdf7f2 | 559 | decode(response, q); |
b32b8144 FG |
560 | for (const auto& b : response.bucket_counters) { |
561 | counter.insert(b.bucket, b.count); | |
562 | } | |
563 | } | |
564 | } catch (const buffer::error& e) { | |
565 | return -EIO; | |
566 | } | |
567 | return 0; | |
568 | } | |
569 | ||
570 | /// metadata callback has the signature bool(string&& key, string&& marker) | |
571 | using MetadataListCallback = std::function<bool(std::string&&, std::string&&)>; | |
572 | ||
573 | /// lists metadata keys, passing each to a callback until it returns false. | |
574 | /// on reaching the end, it will restart at the beginning and list up to the | |
575 | /// initial marker | |
576 | class AsyncMetadataList : public RGWAsyncRadosRequest { | |
577 | CephContext *const cct; | |
578 | RGWMetadataManager *const mgr; | |
579 | const std::string section; | |
580 | const std::string start_marker; | |
581 | MetadataListCallback callback; | |
b32b8144 FG |
582 | |
583 | int _send_request() override; | |
584 | public: | |
585 | AsyncMetadataList(CephContext *cct, RGWCoroutine *caller, | |
586 | RGWAioCompletionNotifier *cn, RGWMetadataManager *mgr, | |
587 | const std::string& section, const std::string& start_marker, | |
588 | const MetadataListCallback& callback) | |
589 | : RGWAsyncRadosRequest(caller, cn), cct(cct), mgr(mgr), | |
590 | section(section), start_marker(start_marker), callback(callback) | |
591 | {} | |
b32b8144 FG |
592 | }; |
593 | ||
594 | int AsyncMetadataList::_send_request() | |
595 | { | |
f64942e4 AA |
596 | void* handle = nullptr; |
597 | std::list<std::string> keys; | |
598 | bool truncated{false}; | |
599 | std::string marker; | |
600 | ||
b32b8144 FG |
601 | // start a listing at the given marker |
602 | int r = mgr->list_keys_init(section, start_marker, &handle); | |
f64942e4 AA |
603 | if (r == -EINVAL) { |
604 | // restart with empty marker below | |
605 | } else if (r < 0) { | |
b32b8144 FG |
606 | ldout(cct, 10) << "failed to init metadata listing: " |
607 | << cpp_strerror(r) << dendl; | |
608 | return r; | |
f64942e4 AA |
609 | } else { |
610 | ldout(cct, 20) << "starting metadata listing at " << start_marker << dendl; | |
611 | ||
612 | // release the handle when scope exits | |
613 | auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); }); | |
614 | ||
615 | do { | |
616 | // get the next key and marker | |
617 | r = mgr->list_keys_next(handle, 1, keys, &truncated); | |
618 | if (r < 0) { | |
619 | ldout(cct, 10) << "failed to list metadata: " | |
620 | << cpp_strerror(r) << dendl; | |
621 | return r; | |
b32b8144 | 622 | } |
f64942e4 AA |
623 | marker = mgr->get_marker(handle); |
624 | ||
625 | if (!keys.empty()) { | |
626 | ceph_assert(keys.size() == 1); | |
627 | auto& key = keys.front(); | |
628 | if (!callback(std::move(key), std::move(marker))) { | |
629 | return 0; | |
630 | } | |
631 | } | |
632 | } while (truncated); | |
b32b8144 | 633 | |
f64942e4 AA |
634 | if (start_marker.empty()) { |
635 | // already listed all keys | |
636 | return 0; | |
637 | } | |
b32b8144 FG |
638 | } |
639 | ||
640 | // restart the listing from the beginning (empty marker) | |
b32b8144 FG |
641 | handle = nullptr; |
642 | ||
643 | r = mgr->list_keys_init(section, "", &handle); | |
644 | if (r < 0) { | |
645 | ldout(cct, 10) << "failed to restart metadata listing: " | |
646 | << cpp_strerror(r) << dendl; | |
647 | return r; | |
648 | } | |
649 | ldout(cct, 20) << "restarting metadata listing" << dendl; | |
650 | ||
f64942e4 AA |
651 | // release the handle when scope exits |
652 | auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); }); | |
b32b8144 FG |
653 | do { |
654 | // get the next key and marker | |
655 | r = mgr->list_keys_next(handle, 1, keys, &truncated); | |
656 | if (r < 0) { | |
657 | ldout(cct, 10) << "failed to list metadata: " | |
658 | << cpp_strerror(r) << dendl; | |
659 | return r; | |
660 | } | |
661 | marker = mgr->get_marker(handle); | |
662 | ||
663 | if (!keys.empty()) { | |
11fdf7f2 | 664 | ceph_assert(keys.size() == 1); |
b32b8144 FG |
665 | auto& key = keys.front(); |
666 | // stop at original marker | |
eafe8130 | 667 | if (marker > start_marker) { |
b32b8144 FG |
668 | return 0; |
669 | } | |
670 | if (!callback(std::move(key), std::move(marker))) { | |
671 | return 0; | |
672 | } | |
673 | } | |
674 | } while (truncated); | |
675 | ||
676 | return 0; | |
677 | } | |
678 | ||
679 | /// coroutine wrapper for AsyncMetadataList | |
680 | class MetadataListCR : public RGWSimpleCoroutine { | |
681 | RGWAsyncRadosProcessor *const async_rados; | |
682 | RGWMetadataManager *const mgr; | |
683 | const std::string& section; | |
684 | const std::string& start_marker; | |
685 | MetadataListCallback callback; | |
686 | RGWAsyncRadosRequest *req{nullptr}; | |
687 | public: | |
688 | MetadataListCR(CephContext *cct, RGWAsyncRadosProcessor *async_rados, | |
689 | RGWMetadataManager *mgr, const std::string& section, | |
690 | const std::string& start_marker, | |
691 | const MetadataListCallback& callback) | |
692 | : RGWSimpleCoroutine(cct), async_rados(async_rados), mgr(mgr), | |
693 | section(section), start_marker(start_marker), callback(callback) | |
694 | {} | |
695 | ~MetadataListCR() override { | |
696 | request_cleanup(); | |
697 | } | |
698 | ||
699 | int send_request() override { | |
700 | req = new AsyncMetadataList(cct, this, stack->create_completion_notifier(), | |
701 | mgr, section, start_marker, callback); | |
702 | async_rados->queue(req); | |
703 | return 0; | |
704 | } | |
705 | int request_complete() override { | |
706 | return req->get_ret_status(); | |
707 | } | |
708 | void request_cleanup() override { | |
709 | if (req) { | |
710 | req->finish(); | |
711 | req = nullptr; | |
712 | } | |
713 | } | |
714 | }; | |
715 | ||
716 | class BucketTrimCR : public RGWCoroutine { | |
717 | RGWRados *const store; | |
718 | RGWHTTPManager *const http; | |
719 | const BucketTrimConfig& config; | |
720 | BucketTrimObserver *const observer; | |
721 | const rgw_raw_obj& obj; | |
722 | ceph::mono_time start_time; | |
723 | bufferlist notify_replies; | |
724 | BucketChangeCounter counter; | |
725 | std::vector<std::string> buckets; //< buckets selected for trim | |
726 | BucketTrimStatus status; | |
727 | RGWObjVersionTracker objv; //< version tracker for trim status object | |
728 | std::string last_cold_marker; //< position for next trim marker | |
729 | ||
730 | static const std::string section; //< metadata section for bucket instances | |
731 | public: | |
732 | BucketTrimCR(RGWRados *store, RGWHTTPManager *http, | |
733 | const BucketTrimConfig& config, BucketTrimObserver *observer, | |
734 | const rgw_raw_obj& obj) | |
735 | : RGWCoroutine(store->ctx()), store(store), http(http), config(config), | |
736 | observer(observer), obj(obj), counter(config.counter_size) | |
737 | {} | |
738 | ||
11fdf7f2 | 739 | int operate() override; |
b32b8144 FG |
740 | }; |
741 | ||
742 | const std::string BucketTrimCR::section{"bucket.instance"}; | |
743 | ||
744 | int BucketTrimCR::operate() | |
745 | { | |
746 | reenter(this) { | |
747 | start_time = ceph::mono_clock::now(); | |
748 | ||
749 | if (config.buckets_per_interval) { | |
750 | // query watch/notify for hot buckets | |
751 | ldout(cct, 10) << "fetching active bucket counters" << dendl; | |
752 | set_status("fetching active bucket counters"); | |
753 | yield { | |
754 | // request the top bucket counters from each peer gateway | |
755 | const TrimNotifyType type = NotifyTrimCounters; | |
756 | TrimCounters::Request request{32}; | |
757 | bufferlist bl; | |
11fdf7f2 TL |
758 | encode(type, bl); |
759 | encode(request, bl); | |
b32b8144 FG |
760 | call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms, |
761 | ¬ify_replies)); | |
762 | } | |
763 | if (retcode < 0) { | |
764 | ldout(cct, 10) << "failed to fetch peer bucket counters" << dendl; | |
765 | return set_cr_error(retcode); | |
766 | } | |
767 | ||
768 | // select the hottest buckets for trim | |
769 | retcode = accumulate_peer_counters(notify_replies, counter); | |
770 | if (retcode < 0) { | |
771 | ldout(cct, 4) << "failed to correlate peer bucket counters" << dendl; | |
772 | return set_cr_error(retcode); | |
773 | } | |
774 | buckets.reserve(config.buckets_per_interval); | |
775 | ||
776 | const int max_count = config.buckets_per_interval - | |
777 | config.min_cold_buckets_per_interval; | |
778 | counter.get_highest(max_count, | |
779 | [this] (const std::string& bucket, int count) { | |
780 | buckets.push_back(bucket); | |
781 | }); | |
782 | } | |
783 | ||
784 | if (buckets.size() < config.buckets_per_interval) { | |
785 | // read BucketTrimStatus for marker position | |
786 | set_status("reading trim status"); | |
787 | using ReadStatus = RGWSimpleRadosReadCR<BucketTrimStatus>; | |
11fdf7f2 | 788 | yield call(new ReadStatus(store->get_async_rados(), store->svc.sysobj, obj, |
b32b8144 FG |
789 | &status, true, &objv)); |
790 | if (retcode < 0) { | |
791 | ldout(cct, 10) << "failed to read bilog trim status: " | |
792 | << cpp_strerror(retcode) << dendl; | |
793 | return set_cr_error(retcode); | |
794 | } | |
795 | if (status.marker == "MAX") { | |
796 | status.marker.clear(); // restart at the beginning | |
797 | } | |
798 | ldout(cct, 10) << "listing cold buckets from marker=" | |
799 | << status.marker << dendl; | |
800 | ||
801 | set_status("listing cold buckets for trim"); | |
802 | yield { | |
803 | // capture a reference so 'this' remains valid in the callback | |
804 | auto ref = boost::intrusive_ptr<RGWCoroutine>{this}; | |
805 | // list cold buckets to consider for trim | |
806 | auto cb = [this, ref] (std::string&& bucket, std::string&& marker) { | |
807 | // filter out keys that we trimmed recently | |
808 | if (observer->trimmed_recently(bucket)) { | |
809 | return true; | |
810 | } | |
811 | // filter out active buckets that we've already selected | |
812 | auto i = std::find(buckets.begin(), buckets.end(), bucket); | |
813 | if (i != buckets.end()) { | |
814 | return true; | |
815 | } | |
816 | buckets.emplace_back(std::move(bucket)); | |
817 | // remember the last cold bucket spawned to update the status marker | |
818 | last_cold_marker = std::move(marker); | |
819 | // return true if there's room for more | |
820 | return buckets.size() < config.buckets_per_interval; | |
821 | }; | |
822 | ||
823 | call(new MetadataListCR(cct, store->get_async_rados(), store->meta_mgr, | |
824 | section, status.marker, cb)); | |
825 | } | |
826 | if (retcode < 0) { | |
827 | ldout(cct, 4) << "failed to list bucket instance metadata: " | |
828 | << cpp_strerror(retcode) << dendl; | |
829 | return set_cr_error(retcode); | |
830 | } | |
831 | } | |
832 | ||
833 | // trim bucket instances with limited concurrency | |
834 | set_status("trimming buckets"); | |
835 | ldout(cct, 4) << "collected " << buckets.size() << " buckets for trim" << dendl; | |
836 | yield call(new BucketTrimInstanceCollectCR(store, http, observer, buckets, | |
837 | config.concurrent_buckets)); | |
838 | // ignore errors from individual buckets | |
839 | ||
840 | // write updated trim status | |
841 | if (!last_cold_marker.empty() && status.marker != last_cold_marker) { | |
842 | set_status("writing updated trim status"); | |
843 | status.marker = std::move(last_cold_marker); | |
844 | ldout(cct, 20) << "writing bucket trim marker=" << status.marker << dendl; | |
845 | using WriteStatus = RGWSimpleRadosWriteCR<BucketTrimStatus>; | |
11fdf7f2 | 846 | yield call(new WriteStatus(store->get_async_rados(), store->svc.sysobj, obj, |
b32b8144 FG |
847 | status, &objv)); |
848 | if (retcode < 0) { | |
849 | ldout(cct, 4) << "failed to write updated trim status: " | |
850 | << cpp_strerror(retcode) << dendl; | |
851 | return set_cr_error(retcode); | |
852 | } | |
853 | } | |
854 | ||
855 | // notify peers that trim completed | |
856 | set_status("trim completed"); | |
857 | yield { | |
858 | const TrimNotifyType type = NotifyTrimComplete; | |
859 | TrimComplete::Request request; | |
860 | bufferlist bl; | |
11fdf7f2 TL |
861 | encode(type, bl); |
862 | encode(request, bl); | |
b32b8144 FG |
863 | call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms, |
864 | nullptr)); | |
865 | } | |
866 | if (retcode < 0) { | |
867 | ldout(cct, 10) << "failed to notify peers of trim completion" << dendl; | |
868 | return set_cr_error(retcode); | |
869 | } | |
870 | ||
871 | ldout(cct, 4) << "bucket index log processing completed in " | |
872 | << ceph::mono_clock::now() - start_time << dendl; | |
873 | return set_cr_done(); | |
874 | } | |
875 | return 0; | |
876 | } | |
877 | ||
878 | class BucketTrimPollCR : public RGWCoroutine { | |
879 | RGWRados *const store; | |
880 | RGWHTTPManager *const http; | |
881 | const BucketTrimConfig& config; | |
882 | BucketTrimObserver *const observer; | |
883 | const rgw_raw_obj& obj; | |
884 | const std::string name{"trim"}; //< lock name | |
885 | const std::string cookie; | |
886 | ||
887 | public: | |
888 | BucketTrimPollCR(RGWRados *store, RGWHTTPManager *http, | |
889 | const BucketTrimConfig& config, | |
890 | BucketTrimObserver *observer, const rgw_raw_obj& obj) | |
891 | : RGWCoroutine(store->ctx()), store(store), http(http), | |
892 | config(config), observer(observer), obj(obj), | |
893 | cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)) | |
894 | {} | |
895 | ||
11fdf7f2 | 896 | int operate() override; |
b32b8144 FG |
897 | }; |
898 | ||
899 | int BucketTrimPollCR::operate() | |
900 | { | |
901 | reenter(this) { | |
902 | for (;;) { | |
903 | set_status("sleeping"); | |
11fdf7f2 | 904 | wait(utime_t{static_cast<time_t>(config.trim_interval_sec), 0}); |
b32b8144 FG |
905 | |
906 | // prevent others from trimming for our entire wait interval | |
907 | set_status("acquiring trim lock"); | |
908 | yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store, | |
909 | obj, name, cookie, | |
910 | config.trim_interval_sec)); | |
911 | if (retcode < 0) { | |
912 | ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl; | |
913 | continue; | |
914 | } | |
915 | ||
916 | set_status("trimming"); | |
917 | yield call(new BucketTrimCR(store, http, config, observer, obj)); | |
918 | if (retcode < 0) { | |
919 | // on errors, unlock so other gateways can try | |
920 | set_status("unlocking"); | |
921 | yield call(new RGWSimpleRadosUnlockCR(store->get_async_rados(), store, | |
922 | obj, name, cookie)); | |
923 | } | |
924 | } | |
925 | } | |
926 | return 0; | |
927 | } | |
928 | ||
929 | /// tracks a bounded list of events with timestamps. old events can be expired, | |
930 | /// and recent events can be searched by key. expiration depends on events being | |
931 | /// inserted in temporal order | |
932 | template <typename T, typename Clock = ceph::coarse_mono_clock> | |
933 | class RecentEventList { | |
934 | public: | |
935 | using clock_type = Clock; | |
936 | using time_point = typename clock_type::time_point; | |
937 | ||
938 | RecentEventList(size_t max_size, const ceph::timespan& max_duration) | |
939 | : events(max_size), max_duration(max_duration) | |
940 | {} | |
941 | ||
942 | /// insert an event at the given point in time. this time must be at least as | |
943 | /// recent as the last inserted event | |
944 | void insert(T&& value, const time_point& now) { | |
11fdf7f2 | 945 | // ceph_assert(events.empty() || now >= events.back().time) |
b32b8144 FG |
946 | events.push_back(Event{std::move(value), now}); |
947 | } | |
948 | ||
949 | /// performs a linear search for an event matching the given key, whose type | |
950 | /// U can be any that provides operator==(U, T) | |
951 | template <typename U> | |
952 | bool lookup(const U& key) const { | |
953 | for (const auto& event : events) { | |
954 | if (key == event.value) { | |
955 | return true; | |
956 | } | |
957 | } | |
958 | return false; | |
959 | } | |
960 | ||
961 | /// remove events that are no longer recent compared to the given point in time | |
962 | void expire_old(const time_point& now) { | |
963 | const auto expired_before = now - max_duration; | |
964 | while (!events.empty() && events.front().time < expired_before) { | |
965 | events.pop_front(); | |
966 | } | |
967 | } | |
968 | ||
969 | private: | |
970 | struct Event { | |
971 | T value; | |
972 | time_point time; | |
973 | }; | |
974 | boost::circular_buffer<Event> events; | |
975 | const ceph::timespan max_duration; | |
976 | }; | |
977 | ||
978 | namespace rgw { | |
979 | ||
980 | // read bucket trim configuration from ceph context | |
981 | void configure_bucket_trim(CephContext *cct, BucketTrimConfig& config) | |
982 | { | |
11fdf7f2 | 983 | const auto& conf = cct->_conf; |
b32b8144 FG |
984 | |
985 | config.trim_interval_sec = | |
11fdf7f2 | 986 | conf.get_val<int64_t>("rgw_sync_log_trim_interval"); |
b32b8144 FG |
987 | config.counter_size = 512; |
988 | config.buckets_per_interval = | |
11fdf7f2 | 989 | conf.get_val<int64_t>("rgw_sync_log_trim_max_buckets"); |
b32b8144 | 990 | config.min_cold_buckets_per_interval = |
11fdf7f2 | 991 | conf.get_val<int64_t>("rgw_sync_log_trim_min_cold_buckets"); |
b32b8144 | 992 | config.concurrent_buckets = |
11fdf7f2 | 993 | conf.get_val<int64_t>("rgw_sync_log_trim_concurrent_buckets"); |
b32b8144 FG |
994 | config.notify_timeout_ms = 10000; |
995 | config.recent_size = 128; | |
996 | config.recent_duration = std::chrono::hours(2); | |
997 | } | |
998 | ||
999 | class BucketTrimManager::Impl : public TrimCounters::Server, | |
1000 | public BucketTrimObserver { | |
1001 | public: | |
1002 | RGWRados *const store; | |
1003 | const BucketTrimConfig config; | |
1004 | ||
1005 | const rgw_raw_obj status_obj; | |
1006 | ||
1007 | /// count frequency of bucket instance entries in the data changes log | |
1008 | BucketChangeCounter counter; | |
1009 | ||
1010 | using RecentlyTrimmedBucketList = RecentEventList<std::string>; | |
1011 | using clock_type = RecentlyTrimmedBucketList::clock_type; | |
1012 | /// track recently trimmed buckets to focus trim activity elsewhere | |
1013 | RecentlyTrimmedBucketList trimmed; | |
1014 | ||
1015 | /// serve the bucket trim watch/notify api | |
1016 | BucketTrimWatcher watcher; | |
1017 | ||
1018 | /// protect data shared between data sync, trim, and watch/notify threads | |
1019 | std::mutex mutex; | |
1020 | ||
1021 | Impl(RGWRados *store, const BucketTrimConfig& config) | |
1022 | : store(store), config(config), | |
11fdf7f2 | 1023 | status_obj(store->svc.zone->get_zone_params().log_pool, BucketTrimStatus::oid), |
b32b8144 FG |
1024 | counter(config.counter_size), |
1025 | trimmed(config.recent_size, config.recent_duration), | |
1026 | watcher(store, status_obj, this) | |
1027 | {} | |
1028 | ||
1029 | /// TrimCounters::Server interface for watch/notify api | |
1030 | void get_bucket_counters(int count, TrimCounters::Vector& buckets) { | |
1031 | buckets.reserve(count); | |
1032 | std::lock_guard<std::mutex> lock(mutex); | |
1033 | counter.get_highest(count, [&buckets] (const std::string& key, int count) { | |
1034 | buckets.emplace_back(key, count); | |
1035 | }); | |
1036 | ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl; | |
1037 | } | |
1038 | ||
1039 | void reset_bucket_counters() override { | |
1040 | ldout(store->ctx(), 20) << "bucket trim completed" << dendl; | |
1041 | std::lock_guard<std::mutex> lock(mutex); | |
1042 | counter.clear(); | |
1043 | trimmed.expire_old(clock_type::now()); | |
1044 | } | |
1045 | ||
1046 | /// BucketTrimObserver interface to remember successfully-trimmed buckets | |
1047 | void on_bucket_trimmed(std::string&& bucket_instance) override { | |
1048 | ldout(store->ctx(), 20) << "trimmed bucket instance " << bucket_instance << dendl; | |
1049 | std::lock_guard<std::mutex> lock(mutex); | |
1050 | trimmed.insert(std::move(bucket_instance), clock_type::now()); | |
1051 | } | |
1052 | ||
1053 | bool trimmed_recently(const boost::string_view& bucket_instance) override { | |
1054 | std::lock_guard<std::mutex> lock(mutex); | |
1055 | return trimmed.lookup(bucket_instance); | |
1056 | } | |
1057 | }; | |
1058 | ||
1059 | BucketTrimManager::BucketTrimManager(RGWRados *store, | |
1060 | const BucketTrimConfig& config) | |
1061 | : impl(new Impl(store, config)) | |
1062 | { | |
1063 | } | |
1064 | BucketTrimManager::~BucketTrimManager() = default; | |
1065 | ||
1066 | int BucketTrimManager::init() | |
1067 | { | |
1068 | return impl->watcher.start(); | |
1069 | } | |
1070 | ||
1071 | void BucketTrimManager::on_bucket_changed(const boost::string_view& bucket) | |
1072 | { | |
1073 | std::lock_guard<std::mutex> lock(impl->mutex); | |
1074 | // filter recently trimmed bucket instances out of bucket change counter | |
1075 | if (impl->trimmed.lookup(bucket)) { | |
1076 | return; | |
1077 | } | |
1078 | impl->counter.insert(bucket.to_string()); | |
1079 | } | |
1080 | ||
1081 | RGWCoroutine* BucketTrimManager::create_bucket_trim_cr(RGWHTTPManager *http) | |
1082 | { | |
1083 | return new BucketTrimPollCR(impl->store, http, impl->config, | |
1084 | impl.get(), impl->status_obj); | |
1085 | } | |
1086 | ||
1087 | RGWCoroutine* BucketTrimManager::create_admin_bucket_trim_cr(RGWHTTPManager *http) | |
1088 | { | |
1089 | // return the trim coroutine without any polling | |
1090 | return new BucketTrimCR(impl->store, http, impl->config, | |
1091 | impl.get(), impl->status_obj); | |
1092 | } | |
1093 | ||
1094 | } // namespace rgw |