1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
5 * Ceph - scalable distributed file system
7 * Copyright (C) 2018 Red Hat, Inc.
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
20 #include "rgw_putobj.h"
21 #include "services/svc_rados.h"
22 #include "services/svc_tier_rados.h"
24 #include "rgw_obj_manifest.h"
36 // an object processor with special handling for the first chunk of the head.
37 // the virtual process_first_chunk() function returns a processor to handle the
39 class HeadObjectProcessor
: public rgw::sal::ObjectProcessor
{
40 uint64_t head_chunk_size
;
41 // buffer to capture the first chunk of the head object
43 // initialized after process_first_chunk() to process everything else
44 rgw::sal::DataProcessor
*processor
= nullptr;
45 uint64_t data_offset
= 0; // maximum offset of data written (ie compressed)
47 uint64_t get_actual_size() const { return data_offset
; }
49 // process the first chunk of data and return a processor for the rest
50 virtual int process_first_chunk(bufferlist
&& data
,
51 rgw::sal::DataProcessor
**processor
) = 0;
53 HeadObjectProcessor(uint64_t head_chunk_size
)
54 : head_chunk_size(head_chunk_size
)
57 void set_head_chunk_size(uint64_t size
) { head_chunk_size
= size
; }
59 // cache first chunk for process_first_chunk(), then forward everything else
60 // to the returned processor
61 int process(bufferlist
&& data
, uint64_t logical_offset
) final override
;
64 using RawObjSet
= std::set
<rgw_raw_obj
>;
66 // a data sink that writes to rados objects and deletes them on cancelation
67 class RadosWriter
: public rgw::sal::DataProcessor
{
69 RGWRados
*const store
;
70 const RGWBucketInfo
& bucket_info
;
71 RGWObjectCtx
& obj_ctx
;
72 const rgw_obj head_obj
;
73 RGWSI_RADOS::Obj stripe_obj
; // current stripe object
74 RawObjSet written
; // set of written objects for deletion
75 const DoutPrefixProvider
*dpp
;
79 RadosWriter(Aio
*aio
, RGWRados
*store
,
80 const RGWBucketInfo
& bucket_info
,
81 RGWObjectCtx
& obj_ctx
, const rgw_obj
& _head_obj
,
82 const DoutPrefixProvider
*dpp
, optional_yield y
)
83 : aio(aio
), store(store
), bucket_info(bucket_info
),
84 obj_ctx(obj_ctx
), head_obj(_head_obj
), dpp(dpp
), y(y
)
88 // add alloc hint to osd
89 void add_write_hint(librados::ObjectWriteOperation
& op
);
91 // change the current stripe object
92 int set_stripe_obj(const rgw_raw_obj
& obj
);
94 // write the data at the given offset of the current stripe object
95 int process(bufferlist
&& data
, uint64_t stripe_offset
) override
;
97 // write the data as an exclusive create and wait for it to complete
98 int write_exclusive(const bufferlist
& data
);
102 // when the operation completes successfully, clear the set of written objects
103 // so they aren't deleted on destruction
104 void clear_written() { written
.clear(); }
109 // a rados object processor that stripes according to RGWObjManifest
110 class ManifestObjectProcessor
: public HeadObjectProcessor
,
111 public StripeGenerator
{
113 RGWRados
* const store
;
114 RGWBucketInfo
& bucket_info
;
115 rgw_placement_rule tail_placement_rule
;
117 RGWObjectCtx
& obj_ctx
;
121 RGWObjManifest manifest
;
122 RGWObjManifest::generator manifest_gen
;
123 ChunkProcessor chunk
;
124 StripeProcessor stripe
;
125 const DoutPrefixProvider
*dpp
;
127 // implements StripeGenerator
128 int next(uint64_t offset
, uint64_t *stripe_size
) override
;
131 ManifestObjectProcessor(Aio
*aio
, RGWRados
* store
,
132 RGWBucketInfo
& bucket_info
,
133 const rgw_placement_rule
*ptail_placement_rule
,
134 const rgw_user
& owner
, RGWObjectCtx
& _obj_ctx
,
135 const rgw_obj
& _head_obj
,
136 const DoutPrefixProvider
* dpp
, optional_yield y
)
137 : HeadObjectProcessor(0),
138 store(store
), bucket_info(bucket_info
),
140 obj_ctx(_obj_ctx
), head_obj(_head_obj
),
141 writer(aio
, store
, bucket_info
, obj_ctx
, head_obj
, dpp
, y
),
142 chunk(&writer
, 0), stripe(&chunk
, this, 0), dpp(dpp
) {
143 if (ptail_placement_rule
) {
144 tail_placement_rule
= *ptail_placement_rule
;
148 void set_owner(const rgw_user
& _owner
) {
152 void set_tail_placement(const rgw_placement_rule
& tpr
) {
153 tail_placement_rule
= tpr
;
155 void set_tail_placement(const rgw_placement_rule
&& tpr
) {
156 tail_placement_rule
= tpr
;
162 // a processor that completes with an atomic write to the head object as part of
163 // a bucket index transaction
164 class AtomicObjectProcessor
: public ManifestObjectProcessor
{
165 const std::optional
<uint64_t> olh_epoch
;
166 const std::string unique_tag
;
167 bufferlist first_chunk
; // written with the head in complete()
169 int process_first_chunk(bufferlist
&& data
, rgw::sal::DataProcessor
**processor
) override
;
171 AtomicObjectProcessor(Aio
*aio
, RGWRados
* store
,
172 RGWBucketInfo
& bucket_info
,
173 const rgw_placement_rule
*ptail_placement_rule
,
174 const rgw_user
& owner
,
175 RGWObjectCtx
& obj_ctx
, const rgw_obj
& _head_obj
,
176 std::optional
<uint64_t> olh_epoch
,
177 const std::string
& unique_tag
,
178 const DoutPrefixProvider
*dpp
, optional_yield y
)
179 : ManifestObjectProcessor(aio
, store
, bucket_info
, ptail_placement_rule
,
180 owner
, obj_ctx
, _head_obj
, dpp
, y
),
181 olh_epoch(olh_epoch
), unique_tag(unique_tag
)
184 // prepare a trivial manifest
185 int prepare(optional_yield y
) override
;
186 // write the head object atomically in a bucket index transaction
187 int complete(size_t accounted_size
, const std::string
& etag
,
188 ceph::real_time
*mtime
, ceph::real_time set_mtime
,
189 std::map
<std::string
, bufferlist
>& attrs
,
190 ceph::real_time delete_at
,
191 const char *if_match
, const char *if_nomatch
,
192 const std::string
*user_data
,
193 rgw_zone_set
*zones_trace
, bool *canceled
,
194 optional_yield y
) override
;
199 // a processor for multipart parts, which don't require atomic completion. the
200 // part's head is written with an exclusive create to detect racing uploads of
201 // the same part/upload id, which are restarted with a random oid prefix
202 class MultipartObjectProcessor
: public ManifestObjectProcessor
{
203 const rgw_obj target_obj
; // target multipart object
204 const std::string upload_id
;
206 const std::string part_num_str
;
209 // write the first chunk and wait on aio->drain() for its completion.
210 // on EEXIST, retry with random prefix
211 int process_first_chunk(bufferlist
&& data
, rgw::sal::DataProcessor
**processor
) override
;
212 // prepare the head stripe and manifest
215 MultipartObjectProcessor(Aio
*aio
, RGWRados
* store
,
216 RGWBucketInfo
& bucket_info
,
217 const rgw_placement_rule
*ptail_placement_rule
,
218 const rgw_user
& owner
, RGWObjectCtx
& obj_ctx
,
219 const rgw_obj
& _head_obj
,
220 const std::string
& upload_id
, uint64_t part_num
,
221 const std::string
& part_num_str
,
222 const DoutPrefixProvider
*dpp
, optional_yield y
)
223 : ManifestObjectProcessor(aio
, store
, bucket_info
, ptail_placement_rule
,
224 owner
, obj_ctx
, _head_obj
, dpp
, y
),
225 target_obj(head_obj
), upload_id(upload_id
),
226 part_num(part_num
), part_num_str(part_num_str
),
227 mp(head_obj
.key
.name
, upload_id
)
230 // prepare a multipart manifest
231 int prepare(optional_yield y
) override
;
232 // write the head object attributes in a bucket index transaction, then
233 // register the completed part with the multipart meta object
234 int complete(size_t accounted_size
, const std::string
& etag
,
235 ceph::real_time
*mtime
, ceph::real_time set_mtime
,
236 std::map
<std::string
, bufferlist
>& attrs
,
237 ceph::real_time delete_at
,
238 const char *if_match
, const char *if_nomatch
,
239 const std::string
*user_data
,
240 rgw_zone_set
*zones_trace
, bool *canceled
,
241 optional_yield y
) override
;
245 class AppendObjectProcessor
: public ManifestObjectProcessor
{
246 uint64_t cur_part_num
;
249 uint64_t *cur_accounted_size
;
250 std::string cur_etag
;
251 const std::string unique_tag
;
253 RGWObjManifest
*cur_manifest
;
255 int process_first_chunk(bufferlist
&& data
, rgw::sal::DataProcessor
**processor
) override
;
258 AppendObjectProcessor(Aio
*aio
, RGWRados
* store
,
259 RGWBucketInfo
& bucket_info
,
260 const rgw_placement_rule
*ptail_placement_rule
,
261 const rgw_user
& owner
, RGWObjectCtx
& obj_ctx
,
262 const rgw_obj
& _head_obj
,
263 const std::string
& unique_tag
, uint64_t position
,
264 uint64_t *cur_accounted_size
,
265 const DoutPrefixProvider
*dpp
, optional_yield y
)
266 : ManifestObjectProcessor(aio
, store
, bucket_info
, ptail_placement_rule
,
267 owner
, obj_ctx
, _head_obj
, dpp
, y
),
268 position(position
), cur_size(0), cur_accounted_size(cur_accounted_size
),
269 unique_tag(unique_tag
), cur_manifest(nullptr)
271 int prepare(optional_yield y
) override
;
272 int complete(size_t accounted_size
, const std::string
& etag
,
273 ceph::real_time
*mtime
, ceph::real_time set_mtime
,
274 std::map
<std::string
, bufferlist
>& attrs
, ceph::real_time delete_at
,
275 const char *if_match
, const char *if_nomatch
, const std::string
*user_data
,
276 rgw_zone_set
*zones_trace
, bool *canceled
,
277 optional_yield y
) override
;
280 } // namespace putobj