]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/driver/rados/rgw_putobj_processor.h
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / rgw / driver / rados / rgw_putobj_processor.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 /*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2018 Red Hat, Inc.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #pragma once
17
18 #include <optional>
19
20 #include "rgw_putobj.h"
21 #include "services/svc_rados.h"
22 #include "services/svc_tier_rados.h"
23 #include "rgw_sal.h"
24 #include "rgw_obj_manifest.h"
25
26 namespace rgw {
27
28 namespace sal {
29 class RadosStore;
30 }
31
32 class Aio;
33
34 namespace putobj {
35
36 // an object processor with special handling for the first chunk of the head.
37 // the virtual process_first_chunk() function returns a processor to handle the
38 // rest of the object
39 class HeadObjectProcessor : public rgw::sal::ObjectProcessor {
40 uint64_t head_chunk_size;
41 // buffer to capture the first chunk of the head object
42 bufferlist head_data;
43 // initialized after process_first_chunk() to process everything else
44 rgw::sal::DataProcessor *processor = nullptr;
45 uint64_t data_offset = 0; // maximum offset of data written (ie compressed)
46 protected:
47 uint64_t get_actual_size() const { return data_offset; }
48
49 // process the first chunk of data and return a processor for the rest
50 virtual int process_first_chunk(bufferlist&& data,
51 rgw::sal::DataProcessor **processor) = 0;
52 public:
53 HeadObjectProcessor(uint64_t head_chunk_size)
54 : head_chunk_size(head_chunk_size)
55 {}
56
57 void set_head_chunk_size(uint64_t size) { head_chunk_size = size; }
58
59 // cache first chunk for process_first_chunk(), then forward everything else
60 // to the returned processor
61 int process(bufferlist&& data, uint64_t logical_offset) final override;
62 };
63
64 using RawObjSet = std::set<rgw_raw_obj>;
65
66 // a data sink that writes to rados objects and deletes them on cancelation
67 class RadosWriter : public rgw::sal::DataProcessor {
68 Aio *const aio;
69 RGWRados *const store;
70 const RGWBucketInfo& bucket_info;
71 RGWObjectCtx& obj_ctx;
72 const rgw_obj head_obj;
73 RGWSI_RADOS::Obj stripe_obj; // current stripe object
74 RawObjSet written; // set of written objects for deletion
75 const DoutPrefixProvider *dpp;
76 optional_yield y;
77
78 public:
79 RadosWriter(Aio *aio, RGWRados *store,
80 const RGWBucketInfo& bucket_info,
81 RGWObjectCtx& obj_ctx, const rgw_obj& _head_obj,
82 const DoutPrefixProvider *dpp, optional_yield y)
83 : aio(aio), store(store), bucket_info(bucket_info),
84 obj_ctx(obj_ctx), head_obj(_head_obj), dpp(dpp), y(y)
85 {}
86 ~RadosWriter();
87
88 // add alloc hint to osd
89 void add_write_hint(librados::ObjectWriteOperation& op);
90
91 // change the current stripe object
92 int set_stripe_obj(const rgw_raw_obj& obj);
93
94 // write the data at the given offset of the current stripe object
95 int process(bufferlist&& data, uint64_t stripe_offset) override;
96
97 // write the data as an exclusive create and wait for it to complete
98 int write_exclusive(const bufferlist& data);
99
100 int drain();
101
102 // when the operation completes successfully, clear the set of written objects
103 // so they aren't deleted on destruction
104 void clear_written() { written.clear(); }
105
106 };
107
108
109 // a rados object processor that stripes according to RGWObjManifest
110 class ManifestObjectProcessor : public HeadObjectProcessor,
111 public StripeGenerator {
112 protected:
113 RGWRados* const store;
114 RGWBucketInfo& bucket_info;
115 rgw_placement_rule tail_placement_rule;
116 rgw_user owner;
117 RGWObjectCtx& obj_ctx;
118 rgw_obj head_obj;
119
120 RadosWriter writer;
121 RGWObjManifest manifest;
122 RGWObjManifest::generator manifest_gen;
123 ChunkProcessor chunk;
124 StripeProcessor stripe;
125 const DoutPrefixProvider *dpp;
126
127 // implements StripeGenerator
128 int next(uint64_t offset, uint64_t *stripe_size) override;
129
130 public:
131 ManifestObjectProcessor(Aio *aio, RGWRados* store,
132 RGWBucketInfo& bucket_info,
133 const rgw_placement_rule *ptail_placement_rule,
134 const rgw_user& owner, RGWObjectCtx& _obj_ctx,
135 const rgw_obj& _head_obj,
136 const DoutPrefixProvider* dpp, optional_yield y)
137 : HeadObjectProcessor(0),
138 store(store), bucket_info(bucket_info),
139 owner(owner),
140 obj_ctx(_obj_ctx), head_obj(_head_obj),
141 writer(aio, store, bucket_info, obj_ctx, head_obj, dpp, y),
142 chunk(&writer, 0), stripe(&chunk, this, 0), dpp(dpp) {
143 if (ptail_placement_rule) {
144 tail_placement_rule = *ptail_placement_rule;
145 }
146 }
147
148 void set_owner(const rgw_user& _owner) {
149 owner = _owner;
150 }
151
152 void set_tail_placement(const rgw_placement_rule& tpr) {
153 tail_placement_rule = tpr;
154 }
155 void set_tail_placement(const rgw_placement_rule&& tpr) {
156 tail_placement_rule = tpr;
157 }
158
159 };
160
161
162 // a processor that completes with an atomic write to the head object as part of
163 // a bucket index transaction
164 class AtomicObjectProcessor : public ManifestObjectProcessor {
165 const std::optional<uint64_t> olh_epoch;
166 const std::string unique_tag;
167 bufferlist first_chunk; // written with the head in complete()
168
169 int process_first_chunk(bufferlist&& data, rgw::sal::DataProcessor **processor) override;
170 public:
171 AtomicObjectProcessor(Aio *aio, RGWRados* store,
172 RGWBucketInfo& bucket_info,
173 const rgw_placement_rule *ptail_placement_rule,
174 const rgw_user& owner,
175 RGWObjectCtx& obj_ctx, const rgw_obj& _head_obj,
176 std::optional<uint64_t> olh_epoch,
177 const std::string& unique_tag,
178 const DoutPrefixProvider *dpp, optional_yield y)
179 : ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule,
180 owner, obj_ctx, _head_obj, dpp, y),
181 olh_epoch(olh_epoch), unique_tag(unique_tag)
182 {}
183
184 // prepare a trivial manifest
185 int prepare(optional_yield y) override;
186 // write the head object atomically in a bucket index transaction
187 int complete(size_t accounted_size, const std::string& etag,
188 ceph::real_time *mtime, ceph::real_time set_mtime,
189 std::map<std::string, bufferlist>& attrs,
190 ceph::real_time delete_at,
191 const char *if_match, const char *if_nomatch,
192 const std::string *user_data,
193 rgw_zone_set *zones_trace, bool *canceled,
194 optional_yield y) override;
195
196 };
197
198
199 // a processor for multipart parts, which don't require atomic completion. the
200 // part's head is written with an exclusive create to detect racing uploads of
201 // the same part/upload id, which are restarted with a random oid prefix
202 class MultipartObjectProcessor : public ManifestObjectProcessor {
203 const rgw_obj target_obj; // target multipart object
204 const std::string upload_id;
205 const int part_num;
206 const std::string part_num_str;
207 RGWMPObj mp;
208
209 // write the first chunk and wait on aio->drain() for its completion.
210 // on EEXIST, retry with random prefix
211 int process_first_chunk(bufferlist&& data, rgw::sal::DataProcessor **processor) override;
212 // prepare the head stripe and manifest
213 int prepare_head();
214 public:
215 MultipartObjectProcessor(Aio *aio, RGWRados* store,
216 RGWBucketInfo& bucket_info,
217 const rgw_placement_rule *ptail_placement_rule,
218 const rgw_user& owner, RGWObjectCtx& obj_ctx,
219 const rgw_obj& _head_obj,
220 const std::string& upload_id, uint64_t part_num,
221 const std::string& part_num_str,
222 const DoutPrefixProvider *dpp, optional_yield y)
223 : ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule,
224 owner, obj_ctx, _head_obj, dpp, y),
225 target_obj(head_obj), upload_id(upload_id),
226 part_num(part_num), part_num_str(part_num_str),
227 mp(head_obj.key.name, upload_id)
228 {}
229
230 // prepare a multipart manifest
231 int prepare(optional_yield y) override;
232 // write the head object attributes in a bucket index transaction, then
233 // register the completed part with the multipart meta object
234 int complete(size_t accounted_size, const std::string& etag,
235 ceph::real_time *mtime, ceph::real_time set_mtime,
236 std::map<std::string, bufferlist>& attrs,
237 ceph::real_time delete_at,
238 const char *if_match, const char *if_nomatch,
239 const std::string *user_data,
240 rgw_zone_set *zones_trace, bool *canceled,
241 optional_yield y) override;
242
243 };
244
245 class AppendObjectProcessor : public ManifestObjectProcessor {
246 uint64_t cur_part_num;
247 uint64_t position;
248 uint64_t cur_size;
249 uint64_t *cur_accounted_size;
250 std::string cur_etag;
251 const std::string unique_tag;
252
253 RGWObjManifest *cur_manifest;
254
255 int process_first_chunk(bufferlist&& data, rgw::sal::DataProcessor **processor) override;
256
257 public:
258 AppendObjectProcessor(Aio *aio, RGWRados* store,
259 RGWBucketInfo& bucket_info,
260 const rgw_placement_rule *ptail_placement_rule,
261 const rgw_user& owner, RGWObjectCtx& obj_ctx,
262 const rgw_obj& _head_obj,
263 const std::string& unique_tag, uint64_t position,
264 uint64_t *cur_accounted_size,
265 const DoutPrefixProvider *dpp, optional_yield y)
266 : ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule,
267 owner, obj_ctx, _head_obj, dpp, y),
268 position(position), cur_size(0), cur_accounted_size(cur_accounted_size),
269 unique_tag(unique_tag), cur_manifest(nullptr)
270 {}
271 int prepare(optional_yield y) override;
272 int complete(size_t accounted_size, const std::string& etag,
273 ceph::real_time *mtime, ceph::real_time set_mtime,
274 std::map<std::string, bufferlist>& attrs, ceph::real_time delete_at,
275 const char *if_match, const char *if_nomatch, const std::string *user_data,
276 rgw_zone_set *zones_trace, bool *canceled,
277 optional_yield y) override;
278 };
279
280 } // namespace putobj
281 } // namespace rgw
282