]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_putobj_processor.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_putobj_processor.h
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
11fdf7f2
TL
3
4/*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2018 Red Hat, Inc.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16#pragma once
17
18#include <optional>
19
20#include "rgw_putobj.h"
21#include "rgw_rados.h"
22#include "services/svc_rados.h"
9f95a23c 23#include "services/svc_tier_rados.h"
11fdf7f2
TL
24
25namespace rgw {
26
27class Aio;
28
29namespace putobj {
30
31// a data consumer that writes an object in a bucket
32class ObjectProcessor : public DataProcessor {
33 public:
34 // prepare to start processing object data
9f95a23c 35 virtual int prepare(optional_yield y) = 0;
11fdf7f2
TL
36
37 // complete the operation and make its result visible to clients
38 virtual int complete(size_t accounted_size, const std::string& etag,
39 ceph::real_time *mtime, ceph::real_time set_mtime,
40 std::map<std::string, bufferlist>& attrs,
41 ceph::real_time delete_at,
42 const char *if_match, const char *if_nomatch,
43 const std::string *user_data,
9f95a23c
TL
44 rgw_zone_set *zones_trace, bool *canceled,
45 optional_yield y) = 0;
11fdf7f2
TL
46};
47
48// an object processor with special handling for the first chunk of the head.
49// the virtual process_first_chunk() function returns a processor to handle the
50// rest of the object
51class HeadObjectProcessor : public ObjectProcessor {
52 uint64_t head_chunk_size;
53 // buffer to capture the first chunk of the head object
54 bufferlist head_data;
55 // initialized after process_first_chunk() to process everything else
56 DataProcessor *processor = nullptr;
57 uint64_t data_offset = 0; // maximum offset of data written (ie compressed)
58 protected:
59 uint64_t get_actual_size() const { return data_offset; }
60
61 // process the first chunk of data and return a processor for the rest
62 virtual int process_first_chunk(bufferlist&& data,
63 DataProcessor **processor) = 0;
64 public:
65 HeadObjectProcessor(uint64_t head_chunk_size)
66 : head_chunk_size(head_chunk_size)
67 {}
68
69 void set_head_chunk_size(uint64_t size) { head_chunk_size = size; }
70
71 // cache first chunk for process_first_chunk(), then forward everything else
72 // to the returned processor
73 int process(bufferlist&& data, uint64_t logical_offset) final override;
74};
75
76
77using RawObjSet = std::set<rgw_raw_obj>;
78
79// a data sink that writes to rados objects and deletes them on cancelation
80class RadosWriter : public DataProcessor {
81 Aio *const aio;
9f95a23c 82 rgw::sal::RGWRadosStore *const store;
11fdf7f2
TL
83 const RGWBucketInfo& bucket_info;
84 RGWObjectCtx& obj_ctx;
85 const rgw_obj head_obj;
86 RGWSI_RADOS::Obj stripe_obj; // current stripe object
87 RawObjSet written; // set of written objects for deletion
9f95a23c
TL
88 const DoutPrefixProvider *dpp;
89 optional_yield y;
11fdf7f2
TL
90
91 public:
9f95a23c
TL
92 RadosWriter(Aio *aio, rgw::sal::RGWRadosStore *store,
93 const RGWBucketInfo& bucket_info,
94 RGWObjectCtx& obj_ctx, const rgw_obj& head_obj,
95 const DoutPrefixProvider *dpp, optional_yield y)
11fdf7f2 96 : aio(aio), store(store), bucket_info(bucket_info),
9f95a23c 97 obj_ctx(obj_ctx), head_obj(head_obj), dpp(dpp), y(y)
11fdf7f2
TL
98 {}
99 ~RadosWriter();
100
101 // change the current stripe object
102 int set_stripe_obj(const rgw_raw_obj& obj);
103
104 // write the data at the given offset of the current stripe object
105 int process(bufferlist&& data, uint64_t stripe_offset) override;
106
107 // write the data as an exclusive create and wait for it to complete
108 int write_exclusive(const bufferlist& data);
109
110 int drain();
111
112 // when the operation completes successfully, clear the set of written objects
113 // so they aren't deleted on destruction
114 void clear_written() { written.clear(); }
9f95a23c 115
11fdf7f2
TL
116};
117
118// a rados object processor that stripes according to RGWObjManifest
119class ManifestObjectProcessor : public HeadObjectProcessor,
120 public StripeGenerator {
121 protected:
9f95a23c 122 rgw::sal::RGWRadosStore *const store;
11fdf7f2
TL
123 const RGWBucketInfo& bucket_info;
124 rgw_placement_rule tail_placement_rule;
9f95a23c 125 rgw_user owner;
11fdf7f2
TL
126 RGWObjectCtx& obj_ctx;
127 rgw_obj head_obj;
128
129 RadosWriter writer;
130 RGWObjManifest manifest;
131 RGWObjManifest::generator manifest_gen;
132 ChunkProcessor chunk;
133 StripeProcessor stripe;
9f95a23c 134 const DoutPrefixProvider *dpp;
11fdf7f2
TL
135
136 // implements StripeGenerator
137 int next(uint64_t offset, uint64_t *stripe_size) override;
138
139 public:
9f95a23c 140 ManifestObjectProcessor(Aio *aio, rgw::sal::RGWRadosStore *store,
11fdf7f2
TL
141 const RGWBucketInfo& bucket_info,
142 const rgw_placement_rule *ptail_placement_rule,
143 const rgw_user& owner, RGWObjectCtx& obj_ctx,
9f95a23c
TL
144 const rgw_obj& head_obj,
145 const DoutPrefixProvider* dpp, optional_yield y)
11fdf7f2
TL
146 : HeadObjectProcessor(0),
147 store(store), bucket_info(bucket_info),
148 owner(owner),
149 obj_ctx(obj_ctx), head_obj(head_obj),
9f95a23c
TL
150 writer(aio, store, bucket_info, obj_ctx, head_obj, dpp, y),
151 chunk(&writer, 0), stripe(&chunk, this, 0), dpp(dpp) {
11fdf7f2
TL
152 if (ptail_placement_rule) {
153 tail_placement_rule = *ptail_placement_rule;
154 }
155 }
156
9f95a23c
TL
157 void set_owner(const rgw_user& _owner) {
158 owner = _owner;
159 }
160
161 void set_tail_placement(const rgw_placement_rule& tpr) {
162 tail_placement_rule = tpr;
163 }
11fdf7f2
TL
164 void set_tail_placement(const rgw_placement_rule&& tpr) {
165 tail_placement_rule = tpr;
166 }
9f95a23c 167
11fdf7f2
TL
168};
169
170
171// a processor that completes with an atomic write to the head object as part of
172// a bucket index transaction
173class AtomicObjectProcessor : public ManifestObjectProcessor {
174 const std::optional<uint64_t> olh_epoch;
175 const std::string unique_tag;
176 bufferlist first_chunk; // written with the head in complete()
177
178 int process_first_chunk(bufferlist&& data, DataProcessor **processor) override;
179 public:
9f95a23c 180 AtomicObjectProcessor(Aio *aio, rgw::sal::RGWRadosStore *store,
11fdf7f2
TL
181 const RGWBucketInfo& bucket_info,
182 const rgw_placement_rule *ptail_placement_rule,
183 const rgw_user& owner,
184 RGWObjectCtx& obj_ctx, const rgw_obj& head_obj,
185 std::optional<uint64_t> olh_epoch,
9f95a23c
TL
186 const std::string& unique_tag,
187 const DoutPrefixProvider *dpp, optional_yield y)
11fdf7f2 188 : ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule,
9f95a23c 189 owner, obj_ctx, head_obj, dpp, y),
11fdf7f2
TL
190 olh_epoch(olh_epoch), unique_tag(unique_tag)
191 {}
192
193 // prepare a trivial manifest
9f95a23c 194 int prepare(optional_yield y) override;
11fdf7f2
TL
195 // write the head object atomically in a bucket index transaction
196 int complete(size_t accounted_size, const std::string& etag,
197 ceph::real_time *mtime, ceph::real_time set_mtime,
198 std::map<std::string, bufferlist>& attrs,
199 ceph::real_time delete_at,
200 const char *if_match, const char *if_nomatch,
201 const std::string *user_data,
9f95a23c
TL
202 rgw_zone_set *zones_trace, bool *canceled,
203 optional_yield y) override;
11fdf7f2
TL
204
205};
206
207
208// a processor for multipart parts, which don't require atomic completion. the
209// part's head is written with an exclusive create to detect racing uploads of
210// the same part/upload id, which are restarted with a random oid prefix
211class MultipartObjectProcessor : public ManifestObjectProcessor {
212 const rgw_obj target_obj; // target multipart object
213 const std::string upload_id;
214 const int part_num;
215 const std::string part_num_str;
216 RGWMPObj mp;
217
218 // write the first chunk and wait on aio->drain() for its completion.
219 // on EEXIST, retry with random prefix
220 int process_first_chunk(bufferlist&& data, DataProcessor **processor) override;
221 // prepare the head stripe and manifest
222 int prepare_head();
223 public:
9f95a23c 224 MultipartObjectProcessor(Aio *aio, rgw::sal::RGWRadosStore *store,
11fdf7f2
TL
225 const RGWBucketInfo& bucket_info,
226 const rgw_placement_rule *ptail_placement_rule,
227 const rgw_user& owner, RGWObjectCtx& obj_ctx,
228 const rgw_obj& head_obj,
229 const std::string& upload_id, uint64_t part_num,
9f95a23c
TL
230 const std::string& part_num_str,
231 const DoutPrefixProvider *dpp, optional_yield y)
11fdf7f2 232 : ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule,
9f95a23c 233 owner, obj_ctx, head_obj, dpp, y),
11fdf7f2
TL
234 target_obj(head_obj), upload_id(upload_id),
235 part_num(part_num), part_num_str(part_num_str),
9f95a23c 236 mp(head_obj.key.name, upload_id)
11fdf7f2
TL
237 {}
238
239 // prepare a multipart manifest
9f95a23c 240 int prepare(optional_yield y) override;
11fdf7f2
TL
241 // write the head object attributes in a bucket index transaction, then
242 // register the completed part with the multipart meta object
243 int complete(size_t accounted_size, const std::string& etag,
244 ceph::real_time *mtime, ceph::real_time set_mtime,
245 std::map<std::string, bufferlist>& attrs,
246 ceph::real_time delete_at,
247 const char *if_match, const char *if_nomatch,
248 const std::string *user_data,
9f95a23c
TL
249 rgw_zone_set *zones_trace, bool *canceled,
250 optional_yield y) override;
251
11fdf7f2
TL
252};
253
254 class AppendObjectProcessor : public ManifestObjectProcessor {
255 uint64_t cur_part_num;
256 uint64_t position;
257 uint64_t cur_size;
258 uint64_t *cur_accounted_size;
259 string cur_etag;
260 const std::string unique_tag;
261
262 RGWObjManifest *cur_manifest;
263
264 int process_first_chunk(bufferlist&& data, DataProcessor **processor) override;
265
266 public:
9f95a23c 267 AppendObjectProcessor(Aio *aio, rgw::sal::RGWRadosStore *store, const RGWBucketInfo& bucket_info,
11fdf7f2
TL
268 const rgw_placement_rule *ptail_placement_rule,
269 const rgw_user& owner, RGWObjectCtx& obj_ctx,const rgw_obj& head_obj,
9f95a23c
TL
270 const std::string& unique_tag, uint64_t position,
271 uint64_t *cur_accounted_size,
272 const DoutPrefixProvider *dpp, optional_yield y)
273 : ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule,
274 owner, obj_ctx, head_obj, dpp, y),
11fdf7f2
TL
275 position(position), cur_size(0), cur_accounted_size(cur_accounted_size),
276 unique_tag(unique_tag), cur_manifest(nullptr)
277 {}
9f95a23c 278 int prepare(optional_yield y) override;
11fdf7f2
TL
279 int complete(size_t accounted_size, const string& etag,
280 ceph::real_time *mtime, ceph::real_time set_mtime,
281 map<string, bufferlist>& attrs, ceph::real_time delete_at,
282 const char *if_match, const char *if_nomatch, const string *user_data,
9f95a23c
TL
283 rgw_zone_set *zones_trace, bool *canceled,
284 optional_yield y) override;
11fdf7f2
TL
285 };
286
287} // namespace putobj
288} // namespace rgw
289