]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | /* | |
5 | * Ceph - scalable distributed file system | |
6 | * | |
7 | * Copyright (C) 2020 Red Hat, Inc | |
8 | * | |
9 | * This is free software; you can redistribute it and/or | |
10 | * modify it under the terms of the GNU Lesser General Public | |
11 | * License version 2.1, as published by the Free Software | |
12 | * Foundation. See file COPYING. | |
13 | */ | |
14 | ||
15 | #include "rgw_sync_error_repo.h" | |
16 | #include "rgw_coroutine.h" | |
17 | #include "rgw_sal.h" | |
18 | #include "services/svc_rados.h" | |
19 | #include "cls/cmpomap/client.h" | |
20 | ||
1e59de90 TL |
21 | namespace rgw::error_repo { |
22 | ||
23 | // prefix for the binary encoding of keys. this particular value is not | |
24 | // valid as the first byte of a utf8 code point, so we use this to | |
25 | // differentiate the binary encoding from existing string keys for | |
26 | // backward-compatibility | |
27 | constexpr uint8_t binary_key_prefix = 0x80; | |
28 | ||
29 | struct key_type { | |
30 | rgw_bucket_shard bs; | |
31 | std::optional<uint64_t> gen; | |
32 | }; | |
33 | ||
34 | void encode(const key_type& k, bufferlist& bl, uint64_t f=0) | |
35 | { | |
36 | ENCODE_START(1, 1, bl); | |
37 | encode(k.bs, bl); | |
38 | encode(k.gen, bl); | |
39 | ENCODE_FINISH(bl); | |
40 | } | |
41 | ||
42 | void decode(key_type& k, bufferlist::const_iterator& bl) | |
43 | { | |
44 | DECODE_START(1, bl); | |
45 | decode(k.bs, bl); | |
46 | decode(k.gen, bl); | |
47 | DECODE_FINISH(bl); | |
48 | } | |
49 | ||
50 | std::string encode_key(const rgw_bucket_shard& bs, | |
51 | std::optional<uint64_t> gen) | |
52 | { | |
53 | using ceph::encode; | |
54 | const auto key = key_type{bs, gen}; | |
55 | bufferlist bl; | |
56 | encode(binary_key_prefix, bl); | |
57 | encode(key, bl); | |
58 | return bl.to_str(); | |
59 | } | |
60 | ||
61 | int decode_key(std::string encoded, | |
62 | rgw_bucket_shard& bs, | |
63 | std::optional<uint64_t>& gen) | |
64 | { | |
65 | using ceph::decode; | |
66 | key_type key; | |
67 | const auto bl = bufferlist::static_from_string(encoded); | |
68 | auto p = bl.cbegin(); | |
69 | try { | |
70 | uint8_t prefix; | |
71 | decode(prefix, p); | |
72 | if (prefix != binary_key_prefix) { | |
73 | return -EINVAL; | |
74 | } | |
75 | decode(key, p); | |
76 | } catch (const buffer::error&) { | |
77 | return -EIO; | |
78 | } | |
79 | if (!p.end()) { | |
80 | return -EIO; // buffer contained unexpected bytes | |
81 | } | |
82 | bs = std::move(key.bs); | |
83 | gen = key.gen; | |
84 | return 0; | |
85 | } | |
86 | ||
87 | ceph::real_time decode_value(const bufferlist& bl) | |
f67539c2 TL |
88 | { |
89 | uint64_t value; | |
90 | try { | |
91 | using ceph::decode; | |
92 | decode(value, bl); | |
93 | } catch (const buffer::error&) { | |
94 | value = 0; // empty buffer = 0 | |
95 | } | |
96 | return ceph::real_clock::zero() + ceph::timespan(value); | |
97 | } | |
98 | ||
1e59de90 TL |
99 | int write(librados::ObjectWriteOperation& op, |
100 | const std::string& key, | |
101 | ceph::real_time timestamp) | |
f67539c2 TL |
102 | { |
103 | // overwrite the existing timestamp if value is greater | |
104 | const uint64_t value = timestamp.time_since_epoch().count(); | |
1e59de90 | 105 | using namespace ::cls::cmpomap; |
f67539c2 TL |
106 | const bufferlist zero = u64_buffer(0); // compare against 0 for missing keys |
107 | return cmp_set_vals(op, Mode::U64, Op::GT, {{key, u64_buffer(value)}}, zero); | |
108 | } | |
109 | ||
1e59de90 TL |
110 | int remove(librados::ObjectWriteOperation& op, |
111 | const std::string& key, | |
112 | ceph::real_time timestamp) | |
f67539c2 TL |
113 | { |
114 | // remove the omap key if value >= existing | |
115 | const uint64_t value = timestamp.time_since_epoch().count(); | |
1e59de90 | 116 | using namespace ::cls::cmpomap; |
f67539c2 TL |
117 | return cmp_rm_keys(op, Mode::U64, Op::GTE, {{key, u64_buffer(value)}}); |
118 | } | |
119 | ||
120 | class RGWErrorRepoWriteCR : public RGWSimpleCoroutine { | |
121 | RGWSI_RADOS::Obj obj; | |
122 | std::string key; | |
123 | ceph::real_time timestamp; | |
124 | ||
125 | boost::intrusive_ptr<RGWAioCompletionNotifier> cn; | |
126 | public: | |
127 | RGWErrorRepoWriteCR(RGWSI_RADOS* rados, const rgw_raw_obj& raw_obj, | |
128 | const std::string& key, ceph::real_time timestamp) | |
129 | : RGWSimpleCoroutine(rados->ctx()), | |
130 | obj(rados->obj(raw_obj)), | |
131 | key(key), timestamp(timestamp) | |
132 | {} | |
133 | ||
b3b6e05e | 134 | int send_request(const DoutPrefixProvider *dpp) override { |
f67539c2 | 135 | librados::ObjectWriteOperation op; |
1e59de90 | 136 | int r = write(op, key, timestamp); |
f67539c2 TL |
137 | if (r < 0) { |
138 | return r; | |
139 | } | |
b3b6e05e | 140 | r = obj.open(dpp); |
f67539c2 TL |
141 | if (r < 0) { |
142 | return r; | |
143 | } | |
144 | ||
145 | cn = stack->create_completion_notifier(); | |
146 | return obj.aio_operate(cn->completion(), &op); | |
147 | } | |
148 | ||
149 | int request_complete() override { | |
150 | return cn->completion()->get_return_value(); | |
151 | } | |
152 | }; | |
153 | ||
1e59de90 TL |
154 | RGWCoroutine* write_cr(RGWSI_RADOS* rados, |
155 | const rgw_raw_obj& obj, | |
156 | const std::string& key, | |
157 | ceph::real_time timestamp) | |
f67539c2 TL |
158 | { |
159 | return new RGWErrorRepoWriteCR(rados, obj, key, timestamp); | |
160 | } | |
161 | ||
162 | ||
163 | class RGWErrorRepoRemoveCR : public RGWSimpleCoroutine { | |
164 | RGWSI_RADOS::Obj obj; | |
165 | std::string key; | |
166 | ceph::real_time timestamp; | |
167 | ||
168 | boost::intrusive_ptr<RGWAioCompletionNotifier> cn; | |
169 | public: | |
170 | RGWErrorRepoRemoveCR(RGWSI_RADOS* rados, const rgw_raw_obj& raw_obj, | |
171 | const std::string& key, ceph::real_time timestamp) | |
172 | : RGWSimpleCoroutine(rados->ctx()), | |
173 | obj(rados->obj(raw_obj)), | |
174 | key(key), timestamp(timestamp) | |
175 | {} | |
176 | ||
b3b6e05e | 177 | int send_request(const DoutPrefixProvider *dpp) override { |
f67539c2 | 178 | librados::ObjectWriteOperation op; |
1e59de90 | 179 | int r = remove(op, key, timestamp); |
f67539c2 TL |
180 | if (r < 0) { |
181 | return r; | |
182 | } | |
b3b6e05e | 183 | r = obj.open(dpp); |
f67539c2 TL |
184 | if (r < 0) { |
185 | return r; | |
186 | } | |
187 | ||
188 | cn = stack->create_completion_notifier(); | |
189 | return obj.aio_operate(cn->completion(), &op); | |
190 | } | |
191 | ||
192 | int request_complete() override { | |
193 | return cn->completion()->get_return_value(); | |
194 | } | |
195 | }; | |
196 | ||
1e59de90 TL |
197 | RGWCoroutine* remove_cr(RGWSI_RADOS* rados, |
198 | const rgw_raw_obj& obj, | |
199 | const std::string& key, | |
200 | ceph::real_time timestamp) | |
f67539c2 TL |
201 | { |
202 | return new RGWErrorRepoRemoveCR(rados, obj, key, timestamp); | |
203 | } | |
1e59de90 TL |
204 | |
205 | } // namespace rgw::error_repo |