1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
5 * Ceph - scalable distributed file system
7 * Copyright (C) 2020 Red Hat, Inc
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
17 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
18 #include "common/intrusive_lru.h"
19 #include "rgw_data_sync.h"
21 namespace rgw::bucket_sync
{
23 // per bucket-shard state cached by DataSyncShardCR
25 // the source bucket shard to sync
26 std::pair
<rgw_bucket_shard
, std::optional
<uint64_t>> key
;
27 // current sync obligation being processed by DataSyncSingleEntry
28 std::optional
<rgw_data_sync_obligation
> obligation
;
29 // incremented with each new obligation
31 // highest timestamp applied by all sources
32 ceph::real_time progress_timestamp
;
34 State(const std::pair
<rgw_bucket_shard
, std::optional
<uint64_t>>& key
) noexcept
36 State(const rgw_bucket_shard
& shard
, std::optional
<uint64_t> gen
) noexcept
44 using lru_config
= ceph::common::intrusive_lru_config
<
45 std::pair
<rgw_bucket_shard
, std::optional
<uint64_t>>, Entry
, EntryToKey
>;
47 // a recyclable cache entry
48 struct Entry
: State
, ceph::common::intrusive_lru_base
<lru_config
> {
53 using type
= std::pair
<rgw_bucket_shard
, std::optional
<uint64_t>>;
54 const type
& operator()(const Entry
& e
) { return e
.key
; }
57 // use a non-atomic reference count since these aren't shared across threads
59 using thread_unsafe_ref_counter
= boost::intrusive_ref_counter
<
60 T
, boost::thread_unsafe_counter
>;
62 // a state cache for entries within a single datalog shard
63 class Cache
: public thread_unsafe_ref_counter
<Cache
> {
64 ceph::common::intrusive_lru
<lru_config
> cache
;
66 // protected ctor to enforce the use of factory function create()
67 explicit Cache(size_t target_size
) {
68 cache
.set_target_size(target_size
);
71 static boost::intrusive_ptr
<Cache
> create(size_t target_size
) {
72 return new Cache(target_size
);
75 // find or create a cache entry for the given key, and return a Handle that
76 // keeps it lru-pinned until destruction
77 Handle
get(const rgw_bucket_shard
& shard
, std::optional
<uint64_t> gen
);
80 // a State handle that keeps the Cache referenced
82 boost::intrusive_ptr
<Cache
> cache
;
83 boost::intrusive_ptr
<Entry
> entry
;
85 Handle() noexcept
= default;
87 Handle(boost::intrusive_ptr
<Cache
> cache
,
88 boost::intrusive_ptr
<Entry
> entry
) noexcept
89 : cache(std::move(cache
)), entry(std::move(entry
)) {}
90 Handle(Handle
&&) = default;
91 Handle(const Handle
&) = default;
92 Handle
& operator=(Handle
&& o
) noexcept
{
93 // move the entry first so that its cache stays referenced over destruction
94 entry
= std::move(o
.entry
);
95 cache
= std::move(o
.cache
);
98 Handle
& operator=(const Handle
& o
) noexcept
{
99 // copy the entry first so that its cache stays referenced over destruction
105 explicit operator bool() const noexcept
{ return static_cast<bool>(entry
); }
106 State
& operator*() const noexcept
{ return *entry
; }
107 State
* operator->() const noexcept
{ return entry
.get(); }
110 inline Handle
Cache::get(const rgw_bucket_shard
& shard
, std::optional
<uint64_t> gen
)
112 auto result
= cache
.get_or_create({ shard
, gen
});
113 return {this, std::move(result
.first
)};
116 } // namespace rgw::bucket_sync