1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank Storage, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
19 #include "ECTransaction.h"
21 #include "os/ObjectStore.h"
22 #include "common/inline_variant.h"
32 using ceph::bufferlist
;
35 using ceph::ErasureCodeInterfaceRef
;
37 void encode_and_write(
40 const ECUtil::stripe_info_t
&sinfo
,
41 ErasureCodeInterfaceRef
&ecimpl
,
46 ECUtil::HashInfoRef hinfo
,
48 map
<shard_id_t
, ObjectStore::Transaction
> *transactions
,
49 DoutPrefixProvider
*dpp
) {
50 const uint64_t before_size
= hinfo
->get_total_logical_size(sinfo
);
51 ceph_assert(sinfo
.logical_offset_is_stripe_aligned(offset
));
52 ceph_assert(sinfo
.logical_offset_is_stripe_aligned(bl
.length()));
53 ceph_assert(bl
.length());
55 map
<int, bufferlist
> buffers
;
56 int r
= ECUtil::encode(
57 sinfo
, ecimpl
, bl
, want
, &buffers
);
60 written
.insert(offset
, bl
.length(), bl
);
62 ldpp_dout(dpp
, 20) << __func__
<< ": " << oid
64 << offset
+ bl
.length()
67 if (offset
>= before_size
) {
68 ceph_assert(offset
== before_size
);
70 sinfo
.aligned_logical_offset_to_chunk_offset(offset
),
74 for (auto &&i
: *transactions
) {
75 ceph_assert(buffers
.count(i
.first
));
76 bufferlist
&enc_bl
= buffers
[i
.first
];
77 if (offset
>= before_size
) {
78 i
.second
.set_alloc_hint(
79 coll_t(spg_t(pgid
, i
.first
)),
80 ghobject_t(oid
, ghobject_t::NO_GEN
, i
.first
),
82 CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE
|
83 CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY
);
86 coll_t(spg_t(pgid
, i
.first
)),
87 ghobject_t(oid
, ghobject_t::NO_GEN
, i
.first
),
88 sinfo
.logical_to_prev_chunk_offset(
96 bool ECTransaction::requires_overwrite(
98 const PGTransaction::ObjectOperation
&op
) {
99 // special handling for truncates to 0
100 if (op
.truncate
&& op
.truncate
->first
== 0)
102 return op
.is_none() &&
103 ((!op
.buffer_updates
.empty() &&
104 (op
.buffer_updates
.begin().get_off() < prev_size
)) ||
106 (op
.truncate
->first
< prev_size
)));
109 void ECTransaction::generate_transactions(
111 ErasureCodeInterfaceRef
&ecimpl
,
113 const ECUtil::stripe_info_t
&sinfo
,
114 const map
<hobject_t
,extent_map
> &partial_extents
,
115 vector
<pg_log_entry_t
> &entries
,
116 map
<hobject_t
,extent_map
> *written_map
,
117 map
<shard_id_t
, ObjectStore::Transaction
> *transactions
,
118 set
<hobject_t
> *temp_added
,
119 set
<hobject_t
> *temp_removed
,
120 DoutPrefixProvider
*dpp
,
121 const ceph_release_t require_osd_release
)
123 ceph_assert(written_map
);
124 ceph_assert(transactions
);
125 ceph_assert(temp_added
);
126 ceph_assert(temp_removed
);
130 auto &hash_infos
= plan
.hash_infos
;
132 map
<hobject_t
, pg_log_entry_t
*> obj_to_log
;
133 for (auto &&i
: entries
) {
134 obj_to_log
.insert(make_pair(i
.soid
, &i
));
137 t
.safe_create_traverse(
138 [&](pair
<const hobject_t
, PGTransaction::ObjectOperation
> &opair
) {
139 const hobject_t
&oid
= opair
.first
;
140 auto &op
= opair
.second
;
141 auto &obc_map
= t
.obc_map
;
142 auto &written
= (*written_map
)[oid
];
144 auto iter
= obj_to_log
.find(oid
);
145 pg_log_entry_t
*entry
= iter
!= obj_to_log
.end() ? iter
->second
: nullptr;
147 ObjectContextRef obc
;
148 auto obiter
= t
.obc_map
.find(oid
);
149 if (obiter
!= t
.obc_map
.end()) {
150 obc
= obiter
->second
;
155 ceph_assert(oid
.is_temp());
158 ECUtil::HashInfoRef hinfo
;
160 auto iter
= hash_infos
.find(oid
);
161 ceph_assert(iter
!= hash_infos
.end());
162 hinfo
= iter
->second
;
166 if (op
.is_fresh_object()) {
167 temp_added
->insert(oid
);
168 } else if (op
.is_delete()) {
169 temp_removed
->insert(oid
);
174 entry
->is_modify() &&
176 bufferlist
bl(op
.updated_snaps
->second
.size() * 8 + 8);
177 encode(op
.updated_snaps
->second
, bl
);
178 entry
->snaps
.swap(bl
);
179 entry
->snaps
.reassign_to_mempool(mempool::mempool_osd_pglog
);
182 ldpp_dout(dpp
, 20) << "generate_transactions: "
184 << ", current size is "
185 << hinfo
->get_total_logical_size(sinfo
)
190 ldpp_dout(dpp
, 20) << "generate_transactions: "
196 if (entry
&& op
.updated_snaps
) {
197 entry
->mod_desc
.update_snaps(op
.updated_snaps
->first
);
200 map
<string
, std::optional
<bufferlist
> > xattr_rollback
;
202 bufferlist old_hinfo
;
203 encode(*hinfo
, old_hinfo
);
204 xattr_rollback
[ECUtil::get_hinfo_key()] = old_hinfo
;
206 if (op
.is_none() && op
.truncate
&& op
.truncate
->first
== 0) {
207 ceph_assert(op
.truncate
->first
== 0);
208 ceph_assert(op
.truncate
->first
==
209 op
.truncate
->second
);
213 if (op
.truncate
->first
!= op
.truncate
->second
) {
214 op
.truncate
->first
= op
.truncate
->second
;
216 op
.truncate
= std::nullopt
;
219 op
.delete_first
= true;
220 op
.init_type
= PGTransaction::ObjectOperation::Init::Create();
223 /* We need to reapply all of the cached xattrs.
224 * std::map insert fortunately only writes keys
225 * which don't already exist, so this should do
226 * the right thing. */
227 op
.attr_updates
.insert(
228 obc
->attr_cache
.begin(),
229 obc
->attr_cache
.end());
233 if (op
.delete_first
) {
234 /* We also want to remove the std::nullopt entries since
235 * the keys already won't exist */
236 for (auto j
= op
.attr_updates
.begin();
237 j
!= op
.attr_updates
.end();
242 op
.attr_updates
.erase(j
++);
245 /* Fill in all current entries for xattr rollback */
247 xattr_rollback
.insert(
248 obc
->attr_cache
.begin(),
249 obc
->attr_cache
.end());
250 obc
->attr_cache
.clear();
253 entry
->mod_desc
.rmobject(entry
->version
.version
);
254 for (auto &&st
: *transactions
) {
255 st
.second
.collection_move_rename(
256 coll_t(spg_t(pgid
, st
.first
)),
257 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
),
258 coll_t(spg_t(pgid
, st
.first
)),
259 ghobject_t(oid
, entry
->version
.version
, st
.first
));
262 for (auto &&st
: *transactions
) {
264 coll_t(spg_t(pgid
, st
.first
)),
265 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
));
271 if (op
.is_fresh_object() && entry
) {
272 entry
->mod_desc
.create();
277 [&](const PGTransaction::ObjectOperation::Init::None
&) {},
278 [&](const PGTransaction::ObjectOperation::Init::Create
&op
) {
279 for (auto &&st
: *transactions
) {
280 if (require_osd_release
>= ceph_release_t::octopus
) {
282 coll_t(spg_t(pgid
, st
.first
)),
283 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
));
286 coll_t(spg_t(pgid
, st
.first
)),
287 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
));
291 [&](const PGTransaction::ObjectOperation::Init::Clone
&op
) {
292 for (auto &&st
: *transactions
) {
294 coll_t(spg_t(pgid
, st
.first
)),
295 ghobject_t(op
.source
, ghobject_t::NO_GEN
, st
.first
),
296 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
));
299 auto siter
= hash_infos
.find(op
.source
);
300 ceph_assert(siter
!= hash_infos
.end());
301 hinfo
->update_to(*(siter
->second
));
304 auto cobciter
= obc_map
.find(op
.source
);
305 ceph_assert(cobciter
!= obc_map
.end());
306 obc
->attr_cache
= cobciter
->second
->attr_cache
;
309 [&](const PGTransaction::ObjectOperation::Init::Rename
&op
) {
310 ceph_assert(op
.source
.is_temp());
311 for (auto &&st
: *transactions
) {
312 st
.second
.collection_move_rename(
313 coll_t(spg_t(pgid
, st
.first
)),
314 ghobject_t(op
.source
, ghobject_t::NO_GEN
, st
.first
),
315 coll_t(spg_t(pgid
, st
.first
)),
316 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
));
318 auto siter
= hash_infos
.find(op
.source
);
319 ceph_assert(siter
!= hash_infos
.end());
320 hinfo
->update_to(*(siter
->second
));
322 auto cobciter
= obc_map
.find(op
.source
);
323 ceph_assert(cobciter
== obc_map
.end());
324 obc
->attr_cache
.clear();
328 // omap not supported (except 0, handled above)
329 ceph_assert(!(op
.clear_omap
));
330 ceph_assert(!(op
.omap_header
));
331 ceph_assert(op
.omap_updates
.empty());
333 if (!op
.attr_updates
.empty()) {
334 map
<string
, bufferlist
, less
<>> to_set
;
335 for (auto &&j
: op
.attr_updates
) {
337 to_set
[j
.first
] = *(j
.second
);
339 for (auto &&st
: *transactions
) {
341 coll_t(spg_t(pgid
, st
.first
)),
342 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
),
347 auto citer
= obc
->attr_cache
.find(j
.first
);
349 if (citer
!= obc
->attr_cache
.end()) {
350 // won't overwrite anything we put in earlier
351 xattr_rollback
.insert(
354 std::optional
<bufferlist
>(citer
->second
)));
356 // won't overwrite anything we put in earlier
357 xattr_rollback
.insert(
364 obc
->attr_cache
[j
.first
] = *(j
.second
);
365 } else if (citer
!= obc
->attr_cache
.end()) {
366 obc
->attr_cache
.erase(citer
);
372 for (auto &&st
: *transactions
) {
374 coll_t(spg_t(pgid
, st
.first
)),
375 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
),
378 ceph_assert(!xattr_rollback
.empty());
380 if (entry
&& !xattr_rollback
.empty()) {
381 entry
->mod_desc
.setattrs(xattr_rollback
);
385 /* logical_to_next_chunk_offset() scales down both aligned and
388 * we don't bother to roll this back at this time for two reasons:
390 * 2) we don't track the old value */
391 uint64_t object_size
= sinfo
.logical_to_next_chunk_offset(
392 op
.alloc_hint
->expected_object_size
);
393 uint64_t write_size
= sinfo
.logical_to_next_chunk_offset(
394 op
.alloc_hint
->expected_write_size
);
396 for (auto &&st
: *transactions
) {
397 st
.second
.set_alloc_hint(
398 coll_t(spg_t(pgid
, st
.first
)),
399 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
),
402 op
.alloc_hint
->flags
);
407 auto pextiter
= partial_extents
.find(oid
);
408 if (pextiter
!= partial_extents
.end()) {
409 to_write
= pextiter
->second
;
412 vector
<pair
<uint64_t, uint64_t> > rollback_extents
;
413 const uint64_t orig_size
= hinfo
->get_total_logical_size(sinfo
);
415 uint64_t new_size
= orig_size
;
416 uint64_t append_after
= new_size
;
417 ldpp_dout(dpp
, 20) << __func__
<< ": new_size start " << new_size
<< dendl
;
418 if (op
.truncate
&& op
.truncate
->first
< new_size
) {
419 ceph_assert(!op
.is_fresh_object());
420 new_size
= sinfo
.logical_to_next_stripe_offset(
422 ldpp_dout(dpp
, 20) << __func__
<< ": new_size truncate down "
423 << new_size
<< dendl
;
424 if (new_size
!= op
.truncate
->first
) { // 0 the unaligned part
426 bl
.append_zero(new_size
- op
.truncate
->first
);
431 append_after
= sinfo
.logical_to_prev_stripe_offset(
434 append_after
= new_size
;
438 std::numeric_limits
<uint64_t>::max() - new_size
);
440 if (entry
&& !op
.is_fresh_object()) {
441 uint64_t restore_from
= sinfo
.logical_to_prev_chunk_offset(
443 uint64_t restore_len
= sinfo
.aligned_logical_offset_to_chunk_offset(
445 sinfo
.logical_to_prev_stripe_offset(op
.truncate
->first
));
446 ceph_assert(rollback_extents
.empty());
448 ldpp_dout(dpp
, 20) << __func__
<< ": saving extent "
449 << make_pair(restore_from
, restore_len
)
451 ldpp_dout(dpp
, 20) << __func__
<< ": truncating to "
454 rollback_extents
.emplace_back(
455 make_pair(restore_from
, restore_len
));
456 for (auto &&st
: *transactions
) {
458 coll_t(spg_t(pgid
, st
.first
)),
459 ghobject_t(oid
, entry
->version
.version
, st
.first
));
460 st
.second
.clone_range(
461 coll_t(spg_t(pgid
, st
.first
)),
462 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
),
463 ghobject_t(oid
, entry
->version
.version
, st
.first
),
470 ldpp_dout(dpp
, 20) << __func__
<< ": not saving extents, fresh object"
473 for (auto &&st
: *transactions
) {
475 coll_t(spg_t(pgid
, st
.first
)),
476 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
),
477 sinfo
.aligned_logical_offset_to_chunk_offset(new_size
));
481 uint32_t fadvise_flags
= 0;
482 for (auto &&extent
: op
.buffer_updates
) {
483 using BufferUpdate
= PGTransaction::ObjectOperation::BufferUpdate
;
487 [&](const BufferUpdate::Write
&op
) {
489 fadvise_flags
|= op
.fadvise_flags
;
491 [&](const BufferUpdate::Zero
&) {
492 bl
.append_zero(extent
.get_len());
494 [&](const BufferUpdate::CloneRange
&) {
497 "CloneRange is not allowed, do_op should have returned ENOTSUPP");
500 uint64_t off
= extent
.get_off();
501 uint64_t len
= extent
.get_len();
502 uint64_t end
= off
+ len
;
503 ldpp_dout(dpp
, 20) << __func__
<< ": adding buffer_update "
504 << make_pair(off
, len
)
506 ceph_assert(len
> 0);
507 if (off
> new_size
) {
508 ceph_assert(off
> append_after
);
509 bl
.prepend_zero(off
- new_size
);
510 len
+= off
- new_size
;
511 ldpp_dout(dpp
, 20) << __func__
<< ": prepending zeroes to align "
512 << off
<< "->" << new_size
516 if (!sinfo
.logical_offset_is_stripe_aligned(end
) && (end
> append_after
)) {
517 uint64_t aligned_end
= sinfo
.logical_to_next_stripe_offset(
519 uint64_t tail
= aligned_end
- end
;
520 bl
.append_zero(tail
);
521 ldpp_dout(dpp
, 20) << __func__
<< ": appending zeroes to align end "
522 << end
<< "->" << end
+tail
523 << ", len: " << len
<< "->" << len
+tail
529 to_write
.insert(off
, len
, bl
);
535 op
.truncate
->second
> new_size
) {
536 ceph_assert(op
.truncate
->second
> append_after
);
537 uint64_t truncate_to
=
538 sinfo
.logical_to_next_stripe_offset(
539 op
.truncate
->second
);
540 uint64_t zeroes
= truncate_to
- new_size
;
542 bl
.append_zero(zeroes
);
547 new_size
= truncate_to
;
548 ldpp_dout(dpp
, 20) << __func__
<< ": truncating out to "
554 for (unsigned i
= 0; i
< ecimpl
->get_chunk_count(); ++i
) {
557 auto to_overwrite
= to_write
.intersect(0, append_after
);
558 ldpp_dout(dpp
, 20) << __func__
<< ": to_overwrite: "
561 for (auto &&extent
: to_overwrite
) {
562 ceph_assert(extent
.get_off() + extent
.get_len() <= append_after
);
563 ceph_assert(sinfo
.logical_offset_is_stripe_aligned(extent
.get_off()));
564 ceph_assert(sinfo
.logical_offset_is_stripe_aligned(extent
.get_len()));
566 uint64_t restore_from
= sinfo
.aligned_logical_offset_to_chunk_offset(
568 uint64_t restore_len
= sinfo
.aligned_logical_offset_to_chunk_offset(
570 ldpp_dout(dpp
, 20) << __func__
<< ": overwriting "
571 << restore_from
<< "~" << restore_len
573 if (rollback_extents
.empty()) {
574 for (auto &&st
: *transactions
) {
576 coll_t(spg_t(pgid
, st
.first
)),
577 ghobject_t(oid
, entry
->version
.version
, st
.first
));
580 rollback_extents
.emplace_back(make_pair(restore_from
, restore_len
));
581 for (auto &&st
: *transactions
) {
582 st
.second
.clone_range(
583 coll_t(spg_t(pgid
, st
.first
)),
584 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
),
585 ghobject_t(oid
, entry
->version
.version
, st
.first
),
606 auto to_append
= to_write
.intersect(
608 std::numeric_limits
<uint64_t>::max() - append_after
);
609 ldpp_dout(dpp
, 20) << __func__
<< ": to_append: "
612 for (auto &&extent
: to_append
) {
613 ceph_assert(sinfo
.logical_offset_is_stripe_aligned(extent
.get_off()));
614 ceph_assert(sinfo
.logical_offset_is_stripe_aligned(extent
.get_len()));
615 ldpp_dout(dpp
, 20) << __func__
<< ": appending "
616 << extent
.get_off() << "~" << extent
.get_len()
633 ldpp_dout(dpp
, 20) << __func__
<< ": " << oid
634 << " resetting hinfo to logical size "
637 if (!rollback_extents
.empty() && entry
) {
639 ldpp_dout(dpp
, 20) << __func__
<< ": " << oid
640 << " marking rollback extents "
643 entry
->mod_desc
.rollback_extents(
644 entry
->version
.version
, rollback_extents
);
646 hinfo
->set_total_chunk_size_clear_hash(
647 sinfo
.aligned_logical_offset_to_chunk_offset(new_size
));
649 ceph_assert(hinfo
->get_total_logical_size(sinfo
) == new_size
);
652 if (entry
&& !to_append
.empty()) {
653 ldpp_dout(dpp
, 20) << __func__
<< ": marking append "
656 entry
->mod_desc
.append(append_after
);
659 if (!op
.is_delete()) {
661 encode(*hinfo
, hbuf
);
662 for (auto &&i
: *transactions
) {
664 coll_t(spg_t(pgid
, i
.first
)),
665 ghobject_t(oid
, ghobject_t::NO_GEN
, i
.first
),
666 ECUtil::get_hinfo_key(),