]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
11fdf7f2 TL |
3 | |
4 | /* | |
5 | * Ceph - scalable distributed file system | |
6 | * | |
7 | * Copyright (C) 2018 Red Hat, Inc. | |
8 | * | |
9 | * This is free software; you can redistribute it and/or | |
10 | * modify it under the terms of the GNU Lesser General Public | |
11 | * License version 2.1, as published by the Free Software | |
12 | * Foundation. See file COPYING. | |
13 | * | |
14 | */ | |
15 | ||
16 | #pragma once | |
17 | ||
18 | #include <optional> | |
19 | ||
20 | #include "rgw_putobj.h" | |
21 | #include "rgw_rados.h" | |
22 | #include "services/svc_rados.h" | |
9f95a23c | 23 | #include "services/svc_tier_rados.h" |
11fdf7f2 TL |
24 | |
25 | namespace rgw { | |
26 | ||
27 | class Aio; | |
28 | ||
29 | namespace putobj { | |
30 | ||
31 | // a data consumer that writes an object in a bucket | |
32 | class ObjectProcessor : public DataProcessor { | |
33 | public: | |
34 | // prepare to start processing object data | |
9f95a23c | 35 | virtual int prepare(optional_yield y) = 0; |
11fdf7f2 TL |
36 | |
37 | // complete the operation and make its result visible to clients | |
38 | virtual int complete(size_t accounted_size, const std::string& etag, | |
39 | ceph::real_time *mtime, ceph::real_time set_mtime, | |
40 | std::map<std::string, bufferlist>& attrs, | |
41 | ceph::real_time delete_at, | |
42 | const char *if_match, const char *if_nomatch, | |
43 | const std::string *user_data, | |
9f95a23c TL |
44 | rgw_zone_set *zones_trace, bool *canceled, |
45 | optional_yield y) = 0; | |
11fdf7f2 TL |
46 | }; |
47 | ||
48 | // an object processor with special handling for the first chunk of the head. | |
49 | // the virtual process_first_chunk() function returns a processor to handle the | |
50 | // rest of the object | |
51 | class HeadObjectProcessor : public ObjectProcessor { | |
52 | uint64_t head_chunk_size; | |
53 | // buffer to capture the first chunk of the head object | |
54 | bufferlist head_data; | |
55 | // initialized after process_first_chunk() to process everything else | |
56 | DataProcessor *processor = nullptr; | |
57 | uint64_t data_offset = 0; // maximum offset of data written (ie compressed) | |
58 | protected: | |
59 | uint64_t get_actual_size() const { return data_offset; } | |
60 | ||
61 | // process the first chunk of data and return a processor for the rest | |
62 | virtual int process_first_chunk(bufferlist&& data, | |
63 | DataProcessor **processor) = 0; | |
64 | public: | |
65 | HeadObjectProcessor(uint64_t head_chunk_size) | |
66 | : head_chunk_size(head_chunk_size) | |
67 | {} | |
68 | ||
69 | void set_head_chunk_size(uint64_t size) { head_chunk_size = size; } | |
70 | ||
71 | // cache first chunk for process_first_chunk(), then forward everything else | |
72 | // to the returned processor | |
73 | int process(bufferlist&& data, uint64_t logical_offset) final override; | |
74 | }; | |
75 | ||
76 | ||
77 | using RawObjSet = std::set<rgw_raw_obj>; | |
78 | ||
79 | // a data sink that writes to rados objects and deletes them on cancelation | |
80 | class RadosWriter : public DataProcessor { | |
81 | Aio *const aio; | |
9f95a23c | 82 | rgw::sal::RGWRadosStore *const store; |
11fdf7f2 TL |
83 | const RGWBucketInfo& bucket_info; |
84 | RGWObjectCtx& obj_ctx; | |
85 | const rgw_obj head_obj; | |
86 | RGWSI_RADOS::Obj stripe_obj; // current stripe object | |
87 | RawObjSet written; // set of written objects for deletion | |
9f95a23c TL |
88 | const DoutPrefixProvider *dpp; |
89 | optional_yield y; | |
11fdf7f2 TL |
90 | |
91 | public: | |
9f95a23c TL |
92 | RadosWriter(Aio *aio, rgw::sal::RGWRadosStore *store, |
93 | const RGWBucketInfo& bucket_info, | |
94 | RGWObjectCtx& obj_ctx, const rgw_obj& head_obj, | |
95 | const DoutPrefixProvider *dpp, optional_yield y) | |
11fdf7f2 | 96 | : aio(aio), store(store), bucket_info(bucket_info), |
9f95a23c | 97 | obj_ctx(obj_ctx), head_obj(head_obj), dpp(dpp), y(y) |
11fdf7f2 TL |
98 | {} |
99 | ~RadosWriter(); | |
100 | ||
101 | // change the current stripe object | |
102 | int set_stripe_obj(const rgw_raw_obj& obj); | |
103 | ||
104 | // write the data at the given offset of the current stripe object | |
105 | int process(bufferlist&& data, uint64_t stripe_offset) override; | |
106 | ||
107 | // write the data as an exclusive create and wait for it to complete | |
108 | int write_exclusive(const bufferlist& data); | |
109 | ||
110 | int drain(); | |
111 | ||
112 | // when the operation completes successfully, clear the set of written objects | |
113 | // so they aren't deleted on destruction | |
114 | void clear_written() { written.clear(); } | |
9f95a23c | 115 | |
11fdf7f2 TL |
116 | }; |
117 | ||
118 | // a rados object processor that stripes according to RGWObjManifest | |
119 | class ManifestObjectProcessor : public HeadObjectProcessor, | |
120 | public StripeGenerator { | |
121 | protected: | |
9f95a23c | 122 | rgw::sal::RGWRadosStore *const store; |
11fdf7f2 TL |
123 | const RGWBucketInfo& bucket_info; |
124 | rgw_placement_rule tail_placement_rule; | |
9f95a23c | 125 | rgw_user owner; |
11fdf7f2 TL |
126 | RGWObjectCtx& obj_ctx; |
127 | rgw_obj head_obj; | |
128 | ||
129 | RadosWriter writer; | |
130 | RGWObjManifest manifest; | |
131 | RGWObjManifest::generator manifest_gen; | |
132 | ChunkProcessor chunk; | |
133 | StripeProcessor stripe; | |
9f95a23c | 134 | const DoutPrefixProvider *dpp; |
11fdf7f2 TL |
135 | |
136 | // implements StripeGenerator | |
137 | int next(uint64_t offset, uint64_t *stripe_size) override; | |
138 | ||
139 | public: | |
9f95a23c | 140 | ManifestObjectProcessor(Aio *aio, rgw::sal::RGWRadosStore *store, |
11fdf7f2 TL |
141 | const RGWBucketInfo& bucket_info, |
142 | const rgw_placement_rule *ptail_placement_rule, | |
143 | const rgw_user& owner, RGWObjectCtx& obj_ctx, | |
9f95a23c TL |
144 | const rgw_obj& head_obj, |
145 | const DoutPrefixProvider* dpp, optional_yield y) | |
11fdf7f2 TL |
146 | : HeadObjectProcessor(0), |
147 | store(store), bucket_info(bucket_info), | |
148 | owner(owner), | |
149 | obj_ctx(obj_ctx), head_obj(head_obj), | |
9f95a23c TL |
150 | writer(aio, store, bucket_info, obj_ctx, head_obj, dpp, y), |
151 | chunk(&writer, 0), stripe(&chunk, this, 0), dpp(dpp) { | |
11fdf7f2 TL |
152 | if (ptail_placement_rule) { |
153 | tail_placement_rule = *ptail_placement_rule; | |
154 | } | |
155 | } | |
156 | ||
9f95a23c TL |
157 | void set_owner(const rgw_user& _owner) { |
158 | owner = _owner; | |
159 | } | |
160 | ||
161 | void set_tail_placement(const rgw_placement_rule& tpr) { | |
162 | tail_placement_rule = tpr; | |
163 | } | |
11fdf7f2 TL |
164 | void set_tail_placement(const rgw_placement_rule&& tpr) { |
165 | tail_placement_rule = tpr; | |
166 | } | |
9f95a23c | 167 | |
11fdf7f2 TL |
168 | }; |
169 | ||
170 | ||
171 | // a processor that completes with an atomic write to the head object as part of | |
172 | // a bucket index transaction | |
173 | class AtomicObjectProcessor : public ManifestObjectProcessor { | |
174 | const std::optional<uint64_t> olh_epoch; | |
175 | const std::string unique_tag; | |
176 | bufferlist first_chunk; // written with the head in complete() | |
177 | ||
178 | int process_first_chunk(bufferlist&& data, DataProcessor **processor) override; | |
179 | public: | |
9f95a23c | 180 | AtomicObjectProcessor(Aio *aio, rgw::sal::RGWRadosStore *store, |
11fdf7f2 TL |
181 | const RGWBucketInfo& bucket_info, |
182 | const rgw_placement_rule *ptail_placement_rule, | |
183 | const rgw_user& owner, | |
184 | RGWObjectCtx& obj_ctx, const rgw_obj& head_obj, | |
185 | std::optional<uint64_t> olh_epoch, | |
9f95a23c TL |
186 | const std::string& unique_tag, |
187 | const DoutPrefixProvider *dpp, optional_yield y) | |
11fdf7f2 | 188 | : ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule, |
9f95a23c | 189 | owner, obj_ctx, head_obj, dpp, y), |
11fdf7f2 TL |
190 | olh_epoch(olh_epoch), unique_tag(unique_tag) |
191 | {} | |
192 | ||
193 | // prepare a trivial manifest | |
9f95a23c | 194 | int prepare(optional_yield y) override; |
11fdf7f2 TL |
195 | // write the head object atomically in a bucket index transaction |
196 | int complete(size_t accounted_size, const std::string& etag, | |
197 | ceph::real_time *mtime, ceph::real_time set_mtime, | |
198 | std::map<std::string, bufferlist>& attrs, | |
199 | ceph::real_time delete_at, | |
200 | const char *if_match, const char *if_nomatch, | |
201 | const std::string *user_data, | |
9f95a23c TL |
202 | rgw_zone_set *zones_trace, bool *canceled, |
203 | optional_yield y) override; | |
11fdf7f2 TL |
204 | |
205 | }; | |
206 | ||
207 | ||
208 | // a processor for multipart parts, which don't require atomic completion. the | |
209 | // part's head is written with an exclusive create to detect racing uploads of | |
210 | // the same part/upload id, which are restarted with a random oid prefix | |
211 | class MultipartObjectProcessor : public ManifestObjectProcessor { | |
212 | const rgw_obj target_obj; // target multipart object | |
213 | const std::string upload_id; | |
214 | const int part_num; | |
215 | const std::string part_num_str; | |
216 | RGWMPObj mp; | |
217 | ||
218 | // write the first chunk and wait on aio->drain() for its completion. | |
219 | // on EEXIST, retry with random prefix | |
220 | int process_first_chunk(bufferlist&& data, DataProcessor **processor) override; | |
221 | // prepare the head stripe and manifest | |
222 | int prepare_head(); | |
223 | public: | |
9f95a23c | 224 | MultipartObjectProcessor(Aio *aio, rgw::sal::RGWRadosStore *store, |
11fdf7f2 TL |
225 | const RGWBucketInfo& bucket_info, |
226 | const rgw_placement_rule *ptail_placement_rule, | |
227 | const rgw_user& owner, RGWObjectCtx& obj_ctx, | |
228 | const rgw_obj& head_obj, | |
229 | const std::string& upload_id, uint64_t part_num, | |
9f95a23c TL |
230 | const std::string& part_num_str, |
231 | const DoutPrefixProvider *dpp, optional_yield y) | |
11fdf7f2 | 232 | : ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule, |
9f95a23c | 233 | owner, obj_ctx, head_obj, dpp, y), |
11fdf7f2 TL |
234 | target_obj(head_obj), upload_id(upload_id), |
235 | part_num(part_num), part_num_str(part_num_str), | |
9f95a23c | 236 | mp(head_obj.key.name, upload_id) |
11fdf7f2 TL |
237 | {} |
238 | ||
239 | // prepare a multipart manifest | |
9f95a23c | 240 | int prepare(optional_yield y) override; |
11fdf7f2 TL |
241 | // write the head object attributes in a bucket index transaction, then |
242 | // register the completed part with the multipart meta object | |
243 | int complete(size_t accounted_size, const std::string& etag, | |
244 | ceph::real_time *mtime, ceph::real_time set_mtime, | |
245 | std::map<std::string, bufferlist>& attrs, | |
246 | ceph::real_time delete_at, | |
247 | const char *if_match, const char *if_nomatch, | |
248 | const std::string *user_data, | |
9f95a23c TL |
249 | rgw_zone_set *zones_trace, bool *canceled, |
250 | optional_yield y) override; | |
251 | ||
11fdf7f2 TL |
252 | }; |
253 | ||
254 | class AppendObjectProcessor : public ManifestObjectProcessor { | |
255 | uint64_t cur_part_num; | |
256 | uint64_t position; | |
257 | uint64_t cur_size; | |
258 | uint64_t *cur_accounted_size; | |
259 | string cur_etag; | |
260 | const std::string unique_tag; | |
261 | ||
262 | RGWObjManifest *cur_manifest; | |
263 | ||
264 | int process_first_chunk(bufferlist&& data, DataProcessor **processor) override; | |
265 | ||
266 | public: | |
9f95a23c | 267 | AppendObjectProcessor(Aio *aio, rgw::sal::RGWRadosStore *store, const RGWBucketInfo& bucket_info, |
11fdf7f2 TL |
268 | const rgw_placement_rule *ptail_placement_rule, |
269 | const rgw_user& owner, RGWObjectCtx& obj_ctx,const rgw_obj& head_obj, | |
9f95a23c TL |
270 | const std::string& unique_tag, uint64_t position, |
271 | uint64_t *cur_accounted_size, | |
272 | const DoutPrefixProvider *dpp, optional_yield y) | |
273 | : ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule, | |
274 | owner, obj_ctx, head_obj, dpp, y), | |
11fdf7f2 TL |
275 | position(position), cur_size(0), cur_accounted_size(cur_accounted_size), |
276 | unique_tag(unique_tag), cur_manifest(nullptr) | |
277 | {} | |
9f95a23c | 278 | int prepare(optional_yield y) override; |
11fdf7f2 TL |
279 | int complete(size_t accounted_size, const string& etag, |
280 | ceph::real_time *mtime, ceph::real_time set_mtime, | |
281 | map<string, bufferlist>& attrs, ceph::real_time delete_at, | |
282 | const char *if_match, const char *if_nomatch, const string *user_data, | |
9f95a23c TL |
283 | rgw_zone_set *zones_trace, bool *canceled, |
284 | optional_yield y) override; | |
11fdf7f2 TL |
285 | }; |
286 | ||
287 | } // namespace putobj | |
288 | } // namespace rgw | |
289 |