1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank Storage, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
19 #include "ECTransaction.h"
21 #include "os/ObjectStore.h"
22 #include "common/inline_variant.h"
25 void encode_and_write(
28 const ECUtil::stripe_info_t
&sinfo
,
29 ErasureCodeInterfaceRef
&ecimpl
,
34 ECUtil::HashInfoRef hinfo
,
36 map
<shard_id_t
, ObjectStore::Transaction
> *transactions
,
37 DoutPrefixProvider
*dpp
) {
38 const uint64_t before_size
= hinfo
->get_total_logical_size(sinfo
);
39 ceph_assert(sinfo
.logical_offset_is_stripe_aligned(offset
));
40 ceph_assert(sinfo
.logical_offset_is_stripe_aligned(bl
.length()));
41 ceph_assert(bl
.length());
43 map
<int, bufferlist
> buffers
;
44 int r
= ECUtil::encode(
45 sinfo
, ecimpl
, bl
, want
, &buffers
);
48 written
.insert(offset
, bl
.length(), bl
);
50 ldpp_dout(dpp
, 20) << __func__
<< ": " << oid
52 << offset
+ bl
.length()
55 if (offset
>= before_size
) {
56 ceph_assert(offset
== before_size
);
58 sinfo
.aligned_logical_offset_to_chunk_offset(offset
),
62 for (auto &&i
: *transactions
) {
63 ceph_assert(buffers
.count(i
.first
));
64 bufferlist
&enc_bl
= buffers
[i
.first
];
65 if (offset
>= before_size
) {
66 i
.second
.set_alloc_hint(
67 coll_t(spg_t(pgid
, i
.first
)),
68 ghobject_t(oid
, ghobject_t::NO_GEN
, i
.first
),
70 CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE
|
71 CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY
);
74 coll_t(spg_t(pgid
, i
.first
)),
75 ghobject_t(oid
, ghobject_t::NO_GEN
, i
.first
),
76 sinfo
.logical_to_prev_chunk_offset(
84 bool ECTransaction::requires_overwrite(
86 const PGTransaction::ObjectOperation
&op
) {
87 // special handling for truncates to 0
88 if (op
.truncate
&& op
.truncate
->first
== 0)
90 return op
.is_none() &&
91 ((!op
.buffer_updates
.empty() &&
92 (op
.buffer_updates
.begin().get_off() < prev_size
)) ||
94 (op
.truncate
->first
< prev_size
)));
97 void ECTransaction::generate_transactions(
99 ErasureCodeInterfaceRef
&ecimpl
,
101 const ECUtil::stripe_info_t
&sinfo
,
102 const map
<hobject_t
,extent_map
> &partial_extents
,
103 vector
<pg_log_entry_t
> &entries
,
104 map
<hobject_t
,extent_map
> *written_map
,
105 map
<shard_id_t
, ObjectStore::Transaction
> *transactions
,
106 set
<hobject_t
> *temp_added
,
107 set
<hobject_t
> *temp_removed
,
108 DoutPrefixProvider
*dpp
,
109 const ceph_release_t require_osd_release
)
111 ceph_assert(written_map
);
112 ceph_assert(transactions
);
113 ceph_assert(temp_added
);
114 ceph_assert(temp_removed
);
118 auto &hash_infos
= plan
.hash_infos
;
120 map
<hobject_t
, pg_log_entry_t
*> obj_to_log
;
121 for (auto &&i
: entries
) {
122 obj_to_log
.insert(make_pair(i
.soid
, &i
));
125 t
.safe_create_traverse(
126 [&](pair
<const hobject_t
, PGTransaction::ObjectOperation
> &opair
) {
127 const hobject_t
&oid
= opair
.first
;
128 auto &op
= opair
.second
;
129 auto &obc_map
= t
.obc_map
;
130 auto &written
= (*written_map
)[oid
];
132 auto iter
= obj_to_log
.find(oid
);
133 pg_log_entry_t
*entry
= iter
!= obj_to_log
.end() ? iter
->second
: nullptr;
135 ObjectContextRef obc
;
136 auto obiter
= t
.obc_map
.find(oid
);
137 if (obiter
!= t
.obc_map
.end()) {
138 obc
= obiter
->second
;
143 ceph_assert(oid
.is_temp());
146 ECUtil::HashInfoRef hinfo
;
148 auto iter
= hash_infos
.find(oid
);
149 ceph_assert(iter
!= hash_infos
.end());
150 hinfo
= iter
->second
;
154 if (op
.is_fresh_object()) {
155 temp_added
->insert(oid
);
156 } else if (op
.is_delete()) {
157 temp_removed
->insert(oid
);
162 entry
->is_modify() &&
164 bufferlist
bl(op
.updated_snaps
->second
.size() * 8 + 8);
165 encode(op
.updated_snaps
->second
, bl
);
166 entry
->snaps
.swap(bl
);
167 entry
->snaps
.reassign_to_mempool(mempool::mempool_osd_pglog
);
170 ldpp_dout(dpp
, 20) << "generate_transactions: "
172 << ", current size is "
173 << hinfo
->get_total_logical_size(sinfo
)
178 ldpp_dout(dpp
, 20) << "generate_transactions: "
184 if (entry
&& op
.updated_snaps
) {
185 entry
->mod_desc
.update_snaps(op
.updated_snaps
->first
);
188 map
<string
, std::optional
<bufferlist
> > xattr_rollback
;
190 bufferlist old_hinfo
;
191 encode(*hinfo
, old_hinfo
);
192 xattr_rollback
[ECUtil::get_hinfo_key()] = old_hinfo
;
194 if (op
.is_none() && op
.truncate
&& op
.truncate
->first
== 0) {
195 ceph_assert(op
.truncate
->first
== 0);
196 ceph_assert(op
.truncate
->first
==
197 op
.truncate
->second
);
201 if (op
.truncate
->first
!= op
.truncate
->second
) {
202 op
.truncate
->first
= op
.truncate
->second
;
204 op
.truncate
= std::nullopt
;
207 op
.delete_first
= true;
208 op
.init_type
= PGTransaction::ObjectOperation::Init::Create();
211 /* We need to reapply all of the cached xattrs.
212 * std::map insert fortunately only writes keys
213 * which don't already exist, so this should do
214 * the right thing. */
215 op
.attr_updates
.insert(
216 obc
->attr_cache
.begin(),
217 obc
->attr_cache
.end());
221 if (op
.delete_first
) {
222 /* We also want to remove the std::nullopt entries since
223 * the keys already won't exist */
224 for (auto j
= op
.attr_updates
.begin();
225 j
!= op
.attr_updates
.end();
230 op
.attr_updates
.erase(j
++);
233 /* Fill in all current entries for xattr rollback */
235 xattr_rollback
.insert(
236 obc
->attr_cache
.begin(),
237 obc
->attr_cache
.end());
238 obc
->attr_cache
.clear();
241 entry
->mod_desc
.rmobject(entry
->version
.version
);
242 for (auto &&st
: *transactions
) {
243 st
.second
.collection_move_rename(
244 coll_t(spg_t(pgid
, st
.first
)),
245 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
),
246 coll_t(spg_t(pgid
, st
.first
)),
247 ghobject_t(oid
, entry
->version
.version
, st
.first
));
250 for (auto &&st
: *transactions
) {
252 coll_t(spg_t(pgid
, st
.first
)),
253 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
));
259 if (op
.is_fresh_object() && entry
) {
260 entry
->mod_desc
.create();
265 [&](const PGTransaction::ObjectOperation::Init::None
&) {},
266 [&](const PGTransaction::ObjectOperation::Init::Create
&op
) {
267 for (auto &&st
: *transactions
) {
268 if (require_osd_release
>= ceph_release_t::octopus
) {
270 coll_t(spg_t(pgid
, st
.first
)),
271 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
));
274 coll_t(spg_t(pgid
, st
.first
)),
275 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
));
279 [&](const PGTransaction::ObjectOperation::Init::Clone
&op
) {
280 for (auto &&st
: *transactions
) {
282 coll_t(spg_t(pgid
, st
.first
)),
283 ghobject_t(op
.source
, ghobject_t::NO_GEN
, st
.first
),
284 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
));
287 auto siter
= hash_infos
.find(op
.source
);
288 ceph_assert(siter
!= hash_infos
.end());
289 hinfo
->update_to(*(siter
->second
));
292 auto cobciter
= obc_map
.find(op
.source
);
293 ceph_assert(cobciter
!= obc_map
.end());
294 obc
->attr_cache
= cobciter
->second
->attr_cache
;
297 [&](const PGTransaction::ObjectOperation::Init::Rename
&op
) {
298 ceph_assert(op
.source
.is_temp());
299 for (auto &&st
: *transactions
) {
300 st
.second
.collection_move_rename(
301 coll_t(spg_t(pgid
, st
.first
)),
302 ghobject_t(op
.source
, ghobject_t::NO_GEN
, st
.first
),
303 coll_t(spg_t(pgid
, st
.first
)),
304 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
));
306 auto siter
= hash_infos
.find(op
.source
);
307 ceph_assert(siter
!= hash_infos
.end());
308 hinfo
->update_to(*(siter
->second
));
310 auto cobciter
= obc_map
.find(op
.source
);
311 ceph_assert(cobciter
== obc_map
.end());
312 obc
->attr_cache
.clear();
316 // omap not supported (except 0, handled above)
317 ceph_assert(!(op
.clear_omap
));
318 ceph_assert(!(op
.omap_header
));
319 ceph_assert(op
.omap_updates
.empty());
321 if (!op
.attr_updates
.empty()) {
322 map
<string
, bufferlist
> to_set
;
323 for (auto &&j
: op
.attr_updates
) {
325 to_set
[j
.first
] = *(j
.second
);
327 for (auto &&st
: *transactions
) {
329 coll_t(spg_t(pgid
, st
.first
)),
330 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
),
335 auto citer
= obc
->attr_cache
.find(j
.first
);
337 if (citer
!= obc
->attr_cache
.end()) {
338 // won't overwrite anything we put in earlier
339 xattr_rollback
.insert(
342 std::optional
<bufferlist
>(citer
->second
)));
344 // won't overwrite anything we put in earlier
345 xattr_rollback
.insert(
352 obc
->attr_cache
[j
.first
] = *(j
.second
);
353 } else if (citer
!= obc
->attr_cache
.end()) {
354 obc
->attr_cache
.erase(citer
);
360 for (auto &&st
: *transactions
) {
362 coll_t(spg_t(pgid
, st
.first
)),
363 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
),
366 ceph_assert(!xattr_rollback
.empty());
368 if (entry
&& !xattr_rollback
.empty()) {
369 entry
->mod_desc
.setattrs(xattr_rollback
);
373 /* logical_to_next_chunk_offset() scales down both aligned and
376 * we don't bother to roll this back at this time for two reasons:
378 * 2) we don't track the old value */
379 uint64_t object_size
= sinfo
.logical_to_next_chunk_offset(
380 op
.alloc_hint
->expected_object_size
);
381 uint64_t write_size
= sinfo
.logical_to_next_chunk_offset(
382 op
.alloc_hint
->expected_write_size
);
384 for (auto &&st
: *transactions
) {
385 st
.second
.set_alloc_hint(
386 coll_t(spg_t(pgid
, st
.first
)),
387 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
),
390 op
.alloc_hint
->flags
);
395 auto pextiter
= partial_extents
.find(oid
);
396 if (pextiter
!= partial_extents
.end()) {
397 to_write
= pextiter
->second
;
400 vector
<pair
<uint64_t, uint64_t> > rollback_extents
;
401 const uint64_t orig_size
= hinfo
->get_total_logical_size(sinfo
);
403 uint64_t new_size
= orig_size
;
404 uint64_t append_after
= new_size
;
405 ldpp_dout(dpp
, 20) << __func__
<< ": new_size start " << new_size
<< dendl
;
406 if (op
.truncate
&& op
.truncate
->first
< new_size
) {
407 ceph_assert(!op
.is_fresh_object());
408 new_size
= sinfo
.logical_to_next_stripe_offset(
410 ldpp_dout(dpp
, 20) << __func__
<< ": new_size truncate down "
411 << new_size
<< dendl
;
412 if (new_size
!= op
.truncate
->first
) { // 0 the unaligned part
414 bl
.append_zero(new_size
- op
.truncate
->first
);
419 append_after
= sinfo
.logical_to_prev_stripe_offset(
422 append_after
= new_size
;
426 std::numeric_limits
<uint64_t>::max() - new_size
);
428 if (entry
&& !op
.is_fresh_object()) {
429 uint64_t restore_from
= sinfo
.logical_to_prev_chunk_offset(
431 uint64_t restore_len
= sinfo
.aligned_logical_offset_to_chunk_offset(
433 sinfo
.logical_to_prev_stripe_offset(op
.truncate
->first
));
434 ceph_assert(rollback_extents
.empty());
436 ldpp_dout(dpp
, 20) << __func__
<< ": saving extent "
437 << make_pair(restore_from
, restore_len
)
439 ldpp_dout(dpp
, 20) << __func__
<< ": truncating to "
442 rollback_extents
.emplace_back(
443 make_pair(restore_from
, restore_len
));
444 for (auto &&st
: *transactions
) {
446 coll_t(spg_t(pgid
, st
.first
)),
447 ghobject_t(oid
, entry
->version
.version
, st
.first
));
448 st
.second
.clone_range(
449 coll_t(spg_t(pgid
, st
.first
)),
450 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
),
451 ghobject_t(oid
, entry
->version
.version
, st
.first
),
458 ldpp_dout(dpp
, 20) << __func__
<< ": not saving extents, fresh object"
461 for (auto &&st
: *transactions
) {
463 coll_t(spg_t(pgid
, st
.first
)),
464 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
),
465 sinfo
.aligned_logical_offset_to_chunk_offset(new_size
));
469 uint32_t fadvise_flags
= 0;
470 for (auto &&extent
: op
.buffer_updates
) {
471 using BufferUpdate
= PGTransaction::ObjectOperation::BufferUpdate
;
475 [&](const BufferUpdate::Write
&op
) {
477 fadvise_flags
|= op
.fadvise_flags
;
479 [&](const BufferUpdate::Zero
&) {
480 bl
.append_zero(extent
.get_len());
482 [&](const BufferUpdate::CloneRange
&) {
485 "CloneRange is not allowed, do_op should have returned ENOTSUPP");
488 uint64_t off
= extent
.get_off();
489 uint64_t len
= extent
.get_len();
490 uint64_t end
= off
+ len
;
491 ldpp_dout(dpp
, 20) << __func__
<< ": adding buffer_update "
492 << make_pair(off
, len
)
494 ceph_assert(len
> 0);
495 if (off
> new_size
) {
496 ceph_assert(off
> append_after
);
497 bl
.prepend_zero(off
- new_size
);
498 len
+= off
- new_size
;
499 ldpp_dout(dpp
, 20) << __func__
<< ": prepending zeroes to align "
500 << off
<< "->" << new_size
504 if (!sinfo
.logical_offset_is_stripe_aligned(end
) && (end
> append_after
)) {
505 uint64_t aligned_end
= sinfo
.logical_to_next_stripe_offset(
507 uint64_t tail
= aligned_end
- end
;
508 bl
.append_zero(tail
);
509 ldpp_dout(dpp
, 20) << __func__
<< ": appending zeroes to align end "
510 << end
<< "->" << end
+tail
511 << ", len: " << len
<< "->" << len
+tail
517 to_write
.insert(off
, len
, bl
);
523 op
.truncate
->second
> new_size
) {
524 ceph_assert(op
.truncate
->second
> append_after
);
525 uint64_t truncate_to
=
526 sinfo
.logical_to_next_stripe_offset(
527 op
.truncate
->second
);
528 uint64_t zeroes
= truncate_to
- new_size
;
530 bl
.append_zero(zeroes
);
535 new_size
= truncate_to
;
536 ldpp_dout(dpp
, 20) << __func__
<< ": truncating out to "
542 for (unsigned i
= 0; i
< ecimpl
->get_chunk_count(); ++i
) {
545 auto to_overwrite
= to_write
.intersect(0, append_after
);
546 ldpp_dout(dpp
, 20) << __func__
<< ": to_overwrite: "
549 for (auto &&extent
: to_overwrite
) {
550 ceph_assert(extent
.get_off() + extent
.get_len() <= append_after
);
551 ceph_assert(sinfo
.logical_offset_is_stripe_aligned(extent
.get_off()));
552 ceph_assert(sinfo
.logical_offset_is_stripe_aligned(extent
.get_len()));
554 uint64_t restore_from
= sinfo
.aligned_logical_offset_to_chunk_offset(
556 uint64_t restore_len
= sinfo
.aligned_logical_offset_to_chunk_offset(
558 ldpp_dout(dpp
, 20) << __func__
<< ": overwriting "
559 << restore_from
<< "~" << restore_len
561 if (rollback_extents
.empty()) {
562 for (auto &&st
: *transactions
) {
564 coll_t(spg_t(pgid
, st
.first
)),
565 ghobject_t(oid
, entry
->version
.version
, st
.first
));
568 rollback_extents
.emplace_back(make_pair(restore_from
, restore_len
));
569 for (auto &&st
: *transactions
) {
570 st
.second
.clone_range(
571 coll_t(spg_t(pgid
, st
.first
)),
572 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
),
573 ghobject_t(oid
, entry
->version
.version
, st
.first
),
594 auto to_append
= to_write
.intersect(
596 std::numeric_limits
<uint64_t>::max() - append_after
);
597 ldpp_dout(dpp
, 20) << __func__
<< ": to_append: "
600 for (auto &&extent
: to_append
) {
601 ceph_assert(sinfo
.logical_offset_is_stripe_aligned(extent
.get_off()));
602 ceph_assert(sinfo
.logical_offset_is_stripe_aligned(extent
.get_len()));
603 ldpp_dout(dpp
, 20) << __func__
<< ": appending "
604 << extent
.get_off() << "~" << extent
.get_len()
621 ldpp_dout(dpp
, 20) << __func__
<< ": " << oid
622 << " resetting hinfo to logical size "
625 if (!rollback_extents
.empty() && entry
) {
627 ldpp_dout(dpp
, 20) << __func__
<< ": " << oid
628 << " marking rollback extents "
631 entry
->mod_desc
.rollback_extents(
632 entry
->version
.version
, rollback_extents
);
634 hinfo
->set_total_chunk_size_clear_hash(
635 sinfo
.aligned_logical_offset_to_chunk_offset(new_size
));
637 ceph_assert(hinfo
->get_total_logical_size(sinfo
) == new_size
);
640 if (entry
&& !to_append
.empty()) {
641 ldpp_dout(dpp
, 20) << __func__
<< ": marking append "
644 entry
->mod_desc
.append(append_after
);
647 if (!op
.is_delete()) {
649 encode(*hinfo
, hbuf
);
650 for (auto &&i
: *transactions
) {
652 coll_t(spg_t(pgid
, i
.first
)),
653 ghobject_t(oid
, ghobject_t::NO_GEN
, i
.first
),
654 ECUtil::get_hinfo_key(),