]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_putobj_processor.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_putobj_processor.h
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
11fdf7f2
TL
3
4/*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2018 Red Hat, Inc.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16#pragma once
17
18#include <optional>
19
20#include "rgw_putobj.h"
11fdf7f2 21#include "services/svc_rados.h"
9f95a23c 22#include "services/svc_tier_rados.h"
f67539c2
TL
23#include "rgw_sal.h"
24#include "rgw_obj_manifest.h"
11fdf7f2
TL
25
26namespace rgw {
27
20effc67
TL
28namespace sal {
29 class RadosStore;
30}
31
11fdf7f2
TL
32class Aio;
33
34namespace putobj {
35
11fdf7f2
TL
36// an object processor with special handling for the first chunk of the head.
37// the virtual process_first_chunk() function returns a processor to handle the
38// rest of the object
20effc67 39class HeadObjectProcessor : public rgw::sal::ObjectProcessor {
11fdf7f2
TL
40 uint64_t head_chunk_size;
41 // buffer to capture the first chunk of the head object
42 bufferlist head_data;
43 // initialized after process_first_chunk() to process everything else
20effc67 44 rgw::sal::DataProcessor *processor = nullptr;
11fdf7f2
TL
45 uint64_t data_offset = 0; // maximum offset of data written (ie compressed)
46 protected:
47 uint64_t get_actual_size() const { return data_offset; }
48
49 // process the first chunk of data and return a processor for the rest
50 virtual int process_first_chunk(bufferlist&& data,
20effc67 51 rgw::sal::DataProcessor **processor) = 0;
11fdf7f2
TL
52 public:
53 HeadObjectProcessor(uint64_t head_chunk_size)
54 : head_chunk_size(head_chunk_size)
55 {}
56
57 void set_head_chunk_size(uint64_t size) { head_chunk_size = size; }
58
59 // cache first chunk for process_first_chunk(), then forward everything else
60 // to the returned processor
61 int process(bufferlist&& data, uint64_t logical_offset) final override;
62};
63
11fdf7f2
TL
64using RawObjSet = std::set<rgw_raw_obj>;
65
66// a data sink that writes to rados objects and deletes them on cancelation
20effc67 67class RadosWriter : public rgw::sal::DataProcessor {
11fdf7f2 68 Aio *const aio;
20effc67 69 rgw::sal::RadosStore *const store;
11fdf7f2 70 RGWObjectCtx& obj_ctx;
20effc67 71 std::unique_ptr<rgw::sal::Object> head_obj;
11fdf7f2
TL
72 RGWSI_RADOS::Obj stripe_obj; // current stripe object
73 RawObjSet written; // set of written objects for deletion
9f95a23c
TL
74 const DoutPrefixProvider *dpp;
75 optional_yield y;
11fdf7f2
TL
76
77 public:
20effc67
TL
78 RadosWriter(Aio *aio, rgw::sal::RadosStore *store,
79 RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object> _head_obj,
9f95a23c 80 const DoutPrefixProvider *dpp, optional_yield y)
20effc67 81 : aio(aio), store(store),
f67539c2 82 obj_ctx(obj_ctx), head_obj(std::move(_head_obj)), dpp(dpp), y(y)
11fdf7f2 83 {}
f67539c2 84 RadosWriter(RadosWriter&& r)
20effc67 85 : aio(r.aio), store(r.store),
f67539c2
TL
86 obj_ctx(r.obj_ctx), head_obj(std::move(r.head_obj)), dpp(r.dpp), y(r.y)
87 {}
88
11fdf7f2
TL
89 ~RadosWriter();
90
20effc67
TL
91 // add alloc hint to osd
92 void add_write_hint(librados::ObjectWriteOperation& op);
93
11fdf7f2
TL
94 // change the current stripe object
95 int set_stripe_obj(const rgw_raw_obj& obj);
96
97 // write the data at the given offset of the current stripe object
98 int process(bufferlist&& data, uint64_t stripe_offset) override;
99
100 // write the data as an exclusive create and wait for it to complete
101 int write_exclusive(const bufferlist& data);
102
103 int drain();
104
105 // when the operation completes successfully, clear the set of written objects
106 // so they aren't deleted on destruction
107 void clear_written() { written.clear(); }
9f95a23c 108
11fdf7f2
TL
109};
110
20effc67 111
11fdf7f2
TL
112// a rados object processor that stripes according to RGWObjManifest
113class ManifestObjectProcessor : public HeadObjectProcessor,
114 public StripeGenerator {
115 protected:
20effc67 116 rgw::sal::RadosStore* const store;
11fdf7f2 117 rgw_placement_rule tail_placement_rule;
9f95a23c 118 rgw_user owner;
11fdf7f2 119 RGWObjectCtx& obj_ctx;
20effc67 120 std::unique_ptr<rgw::sal::Object> head_obj;
11fdf7f2
TL
121
122 RadosWriter writer;
123 RGWObjManifest manifest;
124 RGWObjManifest::generator manifest_gen;
125 ChunkProcessor chunk;
126 StripeProcessor stripe;
9f95a23c 127 const DoutPrefixProvider *dpp;
11fdf7f2
TL
128
129 // implements StripeGenerator
130 int next(uint64_t offset, uint64_t *stripe_size) override;
131
132 public:
20effc67 133 ManifestObjectProcessor(Aio *aio, rgw::sal::RadosStore* store,
11fdf7f2
TL
134 const rgw_placement_rule *ptail_placement_rule,
135 const rgw_user& owner, RGWObjectCtx& obj_ctx,
20effc67 136 std::unique_ptr<rgw::sal::Object> _head_obj,
9f95a23c 137 const DoutPrefixProvider* dpp, optional_yield y)
11fdf7f2 138 : HeadObjectProcessor(0),
20effc67 139 store(store),
11fdf7f2 140 owner(owner),
f67539c2 141 obj_ctx(obj_ctx), head_obj(std::move(_head_obj)),
20effc67 142 writer(aio, store, obj_ctx, head_obj->clone(), dpp, y),
9f95a23c 143 chunk(&writer, 0), stripe(&chunk, this, 0), dpp(dpp) {
11fdf7f2
TL
144 if (ptail_placement_rule) {
145 tail_placement_rule = *ptail_placement_rule;
146 }
147 }
148
9f95a23c
TL
149 void set_owner(const rgw_user& _owner) {
150 owner = _owner;
151 }
152
153 void set_tail_placement(const rgw_placement_rule& tpr) {
154 tail_placement_rule = tpr;
155 }
11fdf7f2
TL
156 void set_tail_placement(const rgw_placement_rule&& tpr) {
157 tail_placement_rule = tpr;
158 }
9f95a23c 159
11fdf7f2
TL
160};
161
162
163// a processor that completes with an atomic write to the head object as part of
164// a bucket index transaction
165class AtomicObjectProcessor : public ManifestObjectProcessor {
166 const std::optional<uint64_t> olh_epoch;
167 const std::string unique_tag;
168 bufferlist first_chunk; // written with the head in complete()
169
20effc67 170 int process_first_chunk(bufferlist&& data, rgw::sal::DataProcessor **processor) override;
11fdf7f2 171 public:
20effc67 172 AtomicObjectProcessor(Aio *aio, rgw::sal::RadosStore* store,
11fdf7f2
TL
173 const rgw_placement_rule *ptail_placement_rule,
174 const rgw_user& owner,
f67539c2 175 RGWObjectCtx& obj_ctx,
20effc67 176 std::unique_ptr<rgw::sal::Object> _head_obj,
11fdf7f2 177 std::optional<uint64_t> olh_epoch,
9f95a23c
TL
178 const std::string& unique_tag,
179 const DoutPrefixProvider *dpp, optional_yield y)
20effc67 180 : ManifestObjectProcessor(aio, store, ptail_placement_rule,
f67539c2 181 owner, obj_ctx, std::move(_head_obj), dpp, y),
11fdf7f2
TL
182 olh_epoch(olh_epoch), unique_tag(unique_tag)
183 {}
184
185 // prepare a trivial manifest
9f95a23c 186 int prepare(optional_yield y) override;
11fdf7f2
TL
187 // write the head object atomically in a bucket index transaction
188 int complete(size_t accounted_size, const std::string& etag,
189 ceph::real_time *mtime, ceph::real_time set_mtime,
190 std::map<std::string, bufferlist>& attrs,
191 ceph::real_time delete_at,
192 const char *if_match, const char *if_nomatch,
193 const std::string *user_data,
9f95a23c
TL
194 rgw_zone_set *zones_trace, bool *canceled,
195 optional_yield y) override;
11fdf7f2
TL
196
197};
198
199
200// a processor for multipart parts, which don't require atomic completion. the
201// part's head is written with an exclusive create to detect racing uploads of
202// the same part/upload id, which are restarted with a random oid prefix
203class MultipartObjectProcessor : public ManifestObjectProcessor {
20effc67 204 std::unique_ptr<rgw::sal::Object> target_obj; // target multipart object
11fdf7f2
TL
205 const std::string upload_id;
206 const int part_num;
207 const std::string part_num_str;
208 RGWMPObj mp;
209
210 // write the first chunk and wait on aio->drain() for its completion.
211 // on EEXIST, retry with random prefix
20effc67 212 int process_first_chunk(bufferlist&& data, rgw::sal::DataProcessor **processor) override;
11fdf7f2
TL
213 // prepare the head stripe and manifest
214 int prepare_head();
215 public:
20effc67 216 MultipartObjectProcessor(Aio *aio, rgw::sal::RadosStore* store,
11fdf7f2
TL
217 const rgw_placement_rule *ptail_placement_rule,
218 const rgw_user& owner, RGWObjectCtx& obj_ctx,
20effc67 219 std::unique_ptr<rgw::sal::Object> _head_obj,
11fdf7f2 220 const std::string& upload_id, uint64_t part_num,
9f95a23c
TL
221 const std::string& part_num_str,
222 const DoutPrefixProvider *dpp, optional_yield y)
20effc67 223 : ManifestObjectProcessor(aio, store, ptail_placement_rule,
f67539c2
TL
224 owner, obj_ctx, std::move(_head_obj), dpp, y),
225 target_obj(head_obj->clone()), upload_id(upload_id),
11fdf7f2 226 part_num(part_num), part_num_str(part_num_str),
f67539c2 227 mp(head_obj->get_name(), upload_id)
11fdf7f2
TL
228 {}
229
230 // prepare a multipart manifest
9f95a23c 231 int prepare(optional_yield y) override;
11fdf7f2
TL
232 // write the head object attributes in a bucket index transaction, then
233 // register the completed part with the multipart meta object
234 int complete(size_t accounted_size, const std::string& etag,
235 ceph::real_time *mtime, ceph::real_time set_mtime,
236 std::map<std::string, bufferlist>& attrs,
237 ceph::real_time delete_at,
238 const char *if_match, const char *if_nomatch,
239 const std::string *user_data,
9f95a23c
TL
240 rgw_zone_set *zones_trace, bool *canceled,
241 optional_yield y) override;
242
11fdf7f2
TL
243};
244
245 class AppendObjectProcessor : public ManifestObjectProcessor {
246 uint64_t cur_part_num;
247 uint64_t position;
248 uint64_t cur_size;
249 uint64_t *cur_accounted_size;
20effc67 250 std::string cur_etag;
11fdf7f2
TL
251 const std::string unique_tag;
252
253 RGWObjManifest *cur_manifest;
254
20effc67 255 int process_first_chunk(bufferlist&& data, rgw::sal::DataProcessor **processor) override;
11fdf7f2
TL
256
257 public:
20effc67 258 AppendObjectProcessor(Aio *aio, rgw::sal::RadosStore* store,
11fdf7f2 259 const rgw_placement_rule *ptail_placement_rule,
f67539c2 260 const rgw_user& owner, RGWObjectCtx& obj_ctx,
20effc67 261 std::unique_ptr<rgw::sal::Object> _head_obj,
9f95a23c
TL
262 const std::string& unique_tag, uint64_t position,
263 uint64_t *cur_accounted_size,
264 const DoutPrefixProvider *dpp, optional_yield y)
20effc67 265 : ManifestObjectProcessor(aio, store, ptail_placement_rule,
f67539c2 266 owner, obj_ctx, std::move(_head_obj), dpp, y),
11fdf7f2
TL
267 position(position), cur_size(0), cur_accounted_size(cur_accounted_size),
268 unique_tag(unique_tag), cur_manifest(nullptr)
269 {}
9f95a23c 270 int prepare(optional_yield y) override;
20effc67 271 int complete(size_t accounted_size, const std::string& etag,
11fdf7f2 272 ceph::real_time *mtime, ceph::real_time set_mtime,
20effc67
TL
273 std::map<std::string, bufferlist>& attrs, ceph::real_time delete_at,
274 const char *if_match, const char *if_nomatch, const std::string *user_data,
9f95a23c
TL
275 rgw_zone_set *zones_trace, bool *canceled,
276 optional_yield y) override;
11fdf7f2
TL
277 };
278
279} // namespace putobj
280} // namespace rgw
281