]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
11fdf7f2 TL |
3 | |
4 | /* | |
5 | * Ceph - scalable distributed file system | |
6 | * | |
7 | * Copyright (C) 2018 Red Hat, Inc. | |
8 | * | |
9 | * This is free software; you can redistribute it and/or | |
10 | * modify it under the terms of the GNU Lesser General Public | |
11 | * License version 2.1, as published by the Free Software | |
12 | * Foundation. See file COPYING. | |
13 | * | |
14 | */ | |
15 | ||
16 | #pragma once | |
17 | ||
18 | #include <optional> | |
19 | ||
20 | #include "rgw_putobj.h" | |
11fdf7f2 | 21 | #include "services/svc_rados.h" |
9f95a23c | 22 | #include "services/svc_tier_rados.h" |
f67539c2 TL |
23 | #include "rgw_sal.h" |
24 | #include "rgw_obj_manifest.h" | |
11fdf7f2 TL |
25 | |
26 | namespace rgw { | |
27 | ||
20effc67 TL |
28 | namespace sal { |
29 | class RadosStore; | |
30 | } | |
31 | ||
11fdf7f2 TL |
32 | class Aio; |
33 | ||
34 | namespace putobj { | |
35 | ||
11fdf7f2 TL |
36 | // an object processor with special handling for the first chunk of the head. |
37 | // the virtual process_first_chunk() function returns a processor to handle the | |
38 | // rest of the object | |
20effc67 | 39 | class HeadObjectProcessor : public rgw::sal::ObjectProcessor { |
11fdf7f2 TL |
40 | uint64_t head_chunk_size; |
41 | // buffer to capture the first chunk of the head object | |
42 | bufferlist head_data; | |
43 | // initialized after process_first_chunk() to process everything else | |
20effc67 | 44 | rgw::sal::DataProcessor *processor = nullptr; |
11fdf7f2 TL |
45 | uint64_t data_offset = 0; // maximum offset of data written (ie compressed) |
46 | protected: | |
47 | uint64_t get_actual_size() const { return data_offset; } | |
48 | ||
49 | // process the first chunk of data and return a processor for the rest | |
50 | virtual int process_first_chunk(bufferlist&& data, | |
20effc67 | 51 | rgw::sal::DataProcessor **processor) = 0; |
11fdf7f2 TL |
52 | public: |
53 | HeadObjectProcessor(uint64_t head_chunk_size) | |
54 | : head_chunk_size(head_chunk_size) | |
55 | {} | |
56 | ||
57 | void set_head_chunk_size(uint64_t size) { head_chunk_size = size; } | |
58 | ||
59 | // cache first chunk for process_first_chunk(), then forward everything else | |
60 | // to the returned processor | |
61 | int process(bufferlist&& data, uint64_t logical_offset) final override; | |
62 | }; | |
63 | ||
11fdf7f2 TL |
64 | using RawObjSet = std::set<rgw_raw_obj>; |
65 | ||
66 | // a data sink that writes to rados objects and deletes them on cancelation | |
20effc67 | 67 | class RadosWriter : public rgw::sal::DataProcessor { |
11fdf7f2 | 68 | Aio *const aio; |
20effc67 | 69 | rgw::sal::RadosStore *const store; |
11fdf7f2 | 70 | RGWObjectCtx& obj_ctx; |
20effc67 | 71 | std::unique_ptr<rgw::sal::Object> head_obj; |
11fdf7f2 TL |
72 | RGWSI_RADOS::Obj stripe_obj; // current stripe object |
73 | RawObjSet written; // set of written objects for deletion | |
9f95a23c TL |
74 | const DoutPrefixProvider *dpp; |
75 | optional_yield y; | |
11fdf7f2 TL |
76 | |
77 | public: | |
20effc67 TL |
78 | RadosWriter(Aio *aio, rgw::sal::RadosStore *store, |
79 | RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object> _head_obj, | |
9f95a23c | 80 | const DoutPrefixProvider *dpp, optional_yield y) |
20effc67 | 81 | : aio(aio), store(store), |
f67539c2 | 82 | obj_ctx(obj_ctx), head_obj(std::move(_head_obj)), dpp(dpp), y(y) |
11fdf7f2 | 83 | {} |
f67539c2 | 84 | RadosWriter(RadosWriter&& r) |
20effc67 | 85 | : aio(r.aio), store(r.store), |
f67539c2 TL |
86 | obj_ctx(r.obj_ctx), head_obj(std::move(r.head_obj)), dpp(r.dpp), y(r.y) |
87 | {} | |
88 | ||
11fdf7f2 TL |
89 | ~RadosWriter(); |
90 | ||
20effc67 TL |
91 | // add alloc hint to osd |
92 | void add_write_hint(librados::ObjectWriteOperation& op); | |
93 | ||
11fdf7f2 TL |
94 | // change the current stripe object |
95 | int set_stripe_obj(const rgw_raw_obj& obj); | |
96 | ||
97 | // write the data at the given offset of the current stripe object | |
98 | int process(bufferlist&& data, uint64_t stripe_offset) override; | |
99 | ||
100 | // write the data as an exclusive create and wait for it to complete | |
101 | int write_exclusive(const bufferlist& data); | |
102 | ||
103 | int drain(); | |
104 | ||
105 | // when the operation completes successfully, clear the set of written objects | |
106 | // so they aren't deleted on destruction | |
107 | void clear_written() { written.clear(); } | |
9f95a23c | 108 | |
11fdf7f2 TL |
109 | }; |
110 | ||
20effc67 | 111 | |
11fdf7f2 TL |
112 | // a rados object processor that stripes according to RGWObjManifest |
113 | class ManifestObjectProcessor : public HeadObjectProcessor, | |
114 | public StripeGenerator { | |
115 | protected: | |
20effc67 | 116 | rgw::sal::RadosStore* const store; |
11fdf7f2 | 117 | rgw_placement_rule tail_placement_rule; |
9f95a23c | 118 | rgw_user owner; |
11fdf7f2 | 119 | RGWObjectCtx& obj_ctx; |
20effc67 | 120 | std::unique_ptr<rgw::sal::Object> head_obj; |
11fdf7f2 TL |
121 | |
122 | RadosWriter writer; | |
123 | RGWObjManifest manifest; | |
124 | RGWObjManifest::generator manifest_gen; | |
125 | ChunkProcessor chunk; | |
126 | StripeProcessor stripe; | |
9f95a23c | 127 | const DoutPrefixProvider *dpp; |
11fdf7f2 TL |
128 | |
129 | // implements StripeGenerator | |
130 | int next(uint64_t offset, uint64_t *stripe_size) override; | |
131 | ||
132 | public: | |
20effc67 | 133 | ManifestObjectProcessor(Aio *aio, rgw::sal::RadosStore* store, |
11fdf7f2 TL |
134 | const rgw_placement_rule *ptail_placement_rule, |
135 | const rgw_user& owner, RGWObjectCtx& obj_ctx, | |
20effc67 | 136 | std::unique_ptr<rgw::sal::Object> _head_obj, |
9f95a23c | 137 | const DoutPrefixProvider* dpp, optional_yield y) |
11fdf7f2 | 138 | : HeadObjectProcessor(0), |
20effc67 | 139 | store(store), |
11fdf7f2 | 140 | owner(owner), |
f67539c2 | 141 | obj_ctx(obj_ctx), head_obj(std::move(_head_obj)), |
20effc67 | 142 | writer(aio, store, obj_ctx, head_obj->clone(), dpp, y), |
9f95a23c | 143 | chunk(&writer, 0), stripe(&chunk, this, 0), dpp(dpp) { |
11fdf7f2 TL |
144 | if (ptail_placement_rule) { |
145 | tail_placement_rule = *ptail_placement_rule; | |
146 | } | |
147 | } | |
148 | ||
9f95a23c TL |
149 | void set_owner(const rgw_user& _owner) { |
150 | owner = _owner; | |
151 | } | |
152 | ||
153 | void set_tail_placement(const rgw_placement_rule& tpr) { | |
154 | tail_placement_rule = tpr; | |
155 | } | |
11fdf7f2 TL |
156 | void set_tail_placement(const rgw_placement_rule&& tpr) { |
157 | tail_placement_rule = tpr; | |
158 | } | |
9f95a23c | 159 | |
11fdf7f2 TL |
160 | }; |
161 | ||
162 | ||
163 | // a processor that completes with an atomic write to the head object as part of | |
164 | // a bucket index transaction | |
165 | class AtomicObjectProcessor : public ManifestObjectProcessor { | |
166 | const std::optional<uint64_t> olh_epoch; | |
167 | const std::string unique_tag; | |
168 | bufferlist first_chunk; // written with the head in complete() | |
169 | ||
20effc67 | 170 | int process_first_chunk(bufferlist&& data, rgw::sal::DataProcessor **processor) override; |
11fdf7f2 | 171 | public: |
20effc67 | 172 | AtomicObjectProcessor(Aio *aio, rgw::sal::RadosStore* store, |
11fdf7f2 TL |
173 | const rgw_placement_rule *ptail_placement_rule, |
174 | const rgw_user& owner, | |
f67539c2 | 175 | RGWObjectCtx& obj_ctx, |
20effc67 | 176 | std::unique_ptr<rgw::sal::Object> _head_obj, |
11fdf7f2 | 177 | std::optional<uint64_t> olh_epoch, |
9f95a23c TL |
178 | const std::string& unique_tag, |
179 | const DoutPrefixProvider *dpp, optional_yield y) | |
20effc67 | 180 | : ManifestObjectProcessor(aio, store, ptail_placement_rule, |
f67539c2 | 181 | owner, obj_ctx, std::move(_head_obj), dpp, y), |
11fdf7f2 TL |
182 | olh_epoch(olh_epoch), unique_tag(unique_tag) |
183 | {} | |
184 | ||
185 | // prepare a trivial manifest | |
9f95a23c | 186 | int prepare(optional_yield y) override; |
11fdf7f2 TL |
187 | // write the head object atomically in a bucket index transaction |
188 | int complete(size_t accounted_size, const std::string& etag, | |
189 | ceph::real_time *mtime, ceph::real_time set_mtime, | |
190 | std::map<std::string, bufferlist>& attrs, | |
191 | ceph::real_time delete_at, | |
192 | const char *if_match, const char *if_nomatch, | |
193 | const std::string *user_data, | |
9f95a23c TL |
194 | rgw_zone_set *zones_trace, bool *canceled, |
195 | optional_yield y) override; | |
11fdf7f2 TL |
196 | |
197 | }; | |
198 | ||
199 | ||
200 | // a processor for multipart parts, which don't require atomic completion. the | |
201 | // part's head is written with an exclusive create to detect racing uploads of | |
202 | // the same part/upload id, which are restarted with a random oid prefix | |
203 | class MultipartObjectProcessor : public ManifestObjectProcessor { | |
20effc67 | 204 | std::unique_ptr<rgw::sal::Object> target_obj; // target multipart object |
11fdf7f2 TL |
205 | const std::string upload_id; |
206 | const int part_num; | |
207 | const std::string part_num_str; | |
208 | RGWMPObj mp; | |
209 | ||
210 | // write the first chunk and wait on aio->drain() for its completion. | |
211 | // on EEXIST, retry with random prefix | |
20effc67 | 212 | int process_first_chunk(bufferlist&& data, rgw::sal::DataProcessor **processor) override; |
11fdf7f2 TL |
213 | // prepare the head stripe and manifest |
214 | int prepare_head(); | |
215 | public: | |
20effc67 | 216 | MultipartObjectProcessor(Aio *aio, rgw::sal::RadosStore* store, |
11fdf7f2 TL |
217 | const rgw_placement_rule *ptail_placement_rule, |
218 | const rgw_user& owner, RGWObjectCtx& obj_ctx, | |
20effc67 | 219 | std::unique_ptr<rgw::sal::Object> _head_obj, |
11fdf7f2 | 220 | const std::string& upload_id, uint64_t part_num, |
9f95a23c TL |
221 | const std::string& part_num_str, |
222 | const DoutPrefixProvider *dpp, optional_yield y) | |
20effc67 | 223 | : ManifestObjectProcessor(aio, store, ptail_placement_rule, |
f67539c2 TL |
224 | owner, obj_ctx, std::move(_head_obj), dpp, y), |
225 | target_obj(head_obj->clone()), upload_id(upload_id), | |
11fdf7f2 | 226 | part_num(part_num), part_num_str(part_num_str), |
f67539c2 | 227 | mp(head_obj->get_name(), upload_id) |
11fdf7f2 TL |
228 | {} |
229 | ||
230 | // prepare a multipart manifest | |
9f95a23c | 231 | int prepare(optional_yield y) override; |
11fdf7f2 TL |
232 | // write the head object attributes in a bucket index transaction, then |
233 | // register the completed part with the multipart meta object | |
234 | int complete(size_t accounted_size, const std::string& etag, | |
235 | ceph::real_time *mtime, ceph::real_time set_mtime, | |
236 | std::map<std::string, bufferlist>& attrs, | |
237 | ceph::real_time delete_at, | |
238 | const char *if_match, const char *if_nomatch, | |
239 | const std::string *user_data, | |
9f95a23c TL |
240 | rgw_zone_set *zones_trace, bool *canceled, |
241 | optional_yield y) override; | |
242 | ||
11fdf7f2 TL |
243 | }; |
244 | ||
245 | class AppendObjectProcessor : public ManifestObjectProcessor { | |
246 | uint64_t cur_part_num; | |
247 | uint64_t position; | |
248 | uint64_t cur_size; | |
249 | uint64_t *cur_accounted_size; | |
20effc67 | 250 | std::string cur_etag; |
11fdf7f2 TL |
251 | const std::string unique_tag; |
252 | ||
253 | RGWObjManifest *cur_manifest; | |
254 | ||
20effc67 | 255 | int process_first_chunk(bufferlist&& data, rgw::sal::DataProcessor **processor) override; |
11fdf7f2 TL |
256 | |
257 | public: | |
20effc67 | 258 | AppendObjectProcessor(Aio *aio, rgw::sal::RadosStore* store, |
11fdf7f2 | 259 | const rgw_placement_rule *ptail_placement_rule, |
f67539c2 | 260 | const rgw_user& owner, RGWObjectCtx& obj_ctx, |
20effc67 | 261 | std::unique_ptr<rgw::sal::Object> _head_obj, |
9f95a23c TL |
262 | const std::string& unique_tag, uint64_t position, |
263 | uint64_t *cur_accounted_size, | |
264 | const DoutPrefixProvider *dpp, optional_yield y) | |
20effc67 | 265 | : ManifestObjectProcessor(aio, store, ptail_placement_rule, |
f67539c2 | 266 | owner, obj_ctx, std::move(_head_obj), dpp, y), |
11fdf7f2 TL |
267 | position(position), cur_size(0), cur_accounted_size(cur_accounted_size), |
268 | unique_tag(unique_tag), cur_manifest(nullptr) | |
269 | {} | |
9f95a23c | 270 | int prepare(optional_yield y) override; |
20effc67 | 271 | int complete(size_t accounted_size, const std::string& etag, |
11fdf7f2 | 272 | ceph::real_time *mtime, ceph::real_time set_mtime, |
20effc67 TL |
273 | std::map<std::string, bufferlist>& attrs, ceph::real_time delete_at, |
274 | const char *if_match, const char *if_nomatch, const std::string *user_data, | |
9f95a23c TL |
275 | rgw_zone_set *zones_trace, bool *canceled, |
276 | optional_yield y) override; | |
11fdf7f2 TL |
277 | }; |
278 | ||
279 | } // namespace putobj | |
280 | } // namespace rgw | |
281 |