1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank Storage, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
20 #include "ECTransaction.h"
22 #include "os/ObjectStore.h"
23 #include "common/inline_variant.h"
26 void encode_and_write(
29 const ECUtil::stripe_info_t
&sinfo
,
30 ErasureCodeInterfaceRef
&ecimpl
,
35 ECUtil::HashInfoRef hinfo
,
37 map
<shard_id_t
, ObjectStore::Transaction
> *transactions
,
38 DoutPrefixProvider
*dpp
) {
39 const uint64_t before_size
= hinfo
->get_total_logical_size(sinfo
);
40 assert(sinfo
.logical_offset_is_stripe_aligned(offset
));
41 assert(sinfo
.logical_offset_is_stripe_aligned(bl
.length()));
44 map
<int, bufferlist
> buffers
;
45 int r
= ECUtil::encode(
46 sinfo
, ecimpl
, bl
, want
, &buffers
);
49 written
.insert(offset
, bl
.length(), bl
);
51 ldpp_dout(dpp
, 20) << __func__
<< ": " << oid
53 << offset
+ bl
.length()
56 if (offset
>= before_size
) {
57 assert(offset
== before_size
);
59 sinfo
.aligned_logical_offset_to_chunk_offset(offset
),
63 for (auto &&i
: *transactions
) {
64 assert(buffers
.count(i
.first
));
65 bufferlist
&enc_bl
= buffers
[i
.first
];
66 if (offset
>= before_size
) {
67 i
.second
.set_alloc_hint(
68 coll_t(spg_t(pgid
, i
.first
)),
69 ghobject_t(oid
, ghobject_t::NO_GEN
, i
.first
),
71 CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE
|
72 CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY
);
75 coll_t(spg_t(pgid
, i
.first
)),
76 ghobject_t(oid
, ghobject_t::NO_GEN
, i
.first
),
77 sinfo
.logical_to_prev_chunk_offset(
85 bool ECTransaction::requires_overwrite(
87 const PGTransaction::ObjectOperation
&op
) {
88 // special handling for truncates to 0
89 if (op
.truncate
&& op
.truncate
->first
== 0)
91 return op
.is_none() &&
92 ((!op
.buffer_updates
.empty() &&
93 (op
.buffer_updates
.begin().get_off() < prev_size
)) ||
95 (op
.truncate
->first
< prev_size
)));
98 void ECTransaction::generate_transactions(
100 ErasureCodeInterfaceRef
&ecimpl
,
102 bool legacy_log_entries
,
103 const ECUtil::stripe_info_t
&sinfo
,
104 const map
<hobject_t
,extent_map
> &partial_extents
,
105 vector
<pg_log_entry_t
> &entries
,
106 map
<hobject_t
,extent_map
> *written_map
,
107 map
<shard_id_t
, ObjectStore::Transaction
> *transactions
,
108 set
<hobject_t
> *temp_added
,
109 set
<hobject_t
> *temp_removed
,
110 DoutPrefixProvider
*dpp
)
113 assert(transactions
);
115 assert(temp_removed
);
119 auto &hash_infos
= plan
.hash_infos
;
121 assert(transactions
);
123 assert(temp_removed
);
125 map
<hobject_t
, pg_log_entry_t
*> obj_to_log
;
126 for (auto &&i
: entries
) {
127 obj_to_log
.insert(make_pair(i
.soid
, &i
));
130 t
.safe_create_traverse(
131 [&](pair
<const hobject_t
, PGTransaction::ObjectOperation
> &opair
) {
132 const hobject_t
&oid
= opair
.first
;
133 auto &op
= opair
.second
;
134 auto &obc_map
= t
.obc_map
;
135 auto &written
= (*written_map
)[oid
];
137 auto iter
= obj_to_log
.find(oid
);
138 pg_log_entry_t
*entry
= iter
!= obj_to_log
.end() ? iter
->second
: nullptr;
140 ObjectContextRef obc
;
141 auto obiter
= t
.obc_map
.find(oid
);
142 if (obiter
!= t
.obc_map
.end()) {
143 obc
= obiter
->second
;
148 assert(oid
.is_temp());
151 ECUtil::HashInfoRef hinfo
;
153 auto iter
= hash_infos
.find(oid
);
154 assert(iter
!= hash_infos
.end());
155 hinfo
= iter
->second
;
159 if (op
.is_fresh_object()) {
160 temp_added
->insert(oid
);
161 } else if (op
.is_delete()) {
162 temp_removed
->insert(oid
);
167 entry
->is_modify() &&
169 bufferlist
bl(op
.updated_snaps
->second
.size() * 8 + 8);
170 ::encode(op
.updated_snaps
->second
, bl
);
171 entry
->snaps
.swap(bl
);
172 entry
->snaps
.reassign_to_mempool(mempool::mempool_osd_pglog
);
175 ldpp_dout(dpp
, 20) << "generate_transactions: "
177 << ", current size is "
178 << hinfo
->get_total_logical_size(sinfo
)
183 ldpp_dout(dpp
, 20) << "generate_transactions: "
189 if (entry
&& op
.updated_snaps
) {
190 entry
->mod_desc
.update_snaps(op
.updated_snaps
->first
);
193 map
<string
, boost::optional
<bufferlist
> > xattr_rollback
;
195 bufferlist old_hinfo
;
196 ::encode(*hinfo
, old_hinfo
);
197 xattr_rollback
[ECUtil::get_hinfo_key()] = old_hinfo
;
199 if (op
.is_none() && op
.truncate
&& op
.truncate
->first
== 0) {
200 assert(op
.truncate
->first
== 0);
201 assert(op
.truncate
->first
==
202 op
.truncate
->second
);
206 if (op
.truncate
->first
!= op
.truncate
->second
) {
207 op
.truncate
->first
= op
.truncate
->second
;
209 op
.truncate
= boost::none
;
212 op
.delete_first
= true;
213 op
.init_type
= PGTransaction::ObjectOperation::Init::Create();
216 /* We need to reapply all of the cached xattrs.
217 * std::map insert fortunately only writes keys
218 * which don't already exist, so this should do
219 * the right thing. */
220 op
.attr_updates
.insert(
221 obc
->attr_cache
.begin(),
222 obc
->attr_cache
.end());
226 if (op
.delete_first
) {
227 /* We also want to remove the boost::none entries since
228 * the keys already won't exist */
229 for (auto j
= op
.attr_updates
.begin();
230 j
!= op
.attr_updates
.end();
235 op
.attr_updates
.erase(j
++);
238 /* Fill in all current entries for xattr rollback */
240 xattr_rollback
.insert(
241 obc
->attr_cache
.begin(),
242 obc
->attr_cache
.end());
243 obc
->attr_cache
.clear();
246 entry
->mod_desc
.rmobject(entry
->version
.version
);
247 for (auto &&st
: *transactions
) {
248 st
.second
.collection_move_rename(
249 coll_t(spg_t(pgid
, st
.first
)),
250 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
),
251 coll_t(spg_t(pgid
, st
.first
)),
252 ghobject_t(oid
, entry
->version
.version
, st
.first
));
255 for (auto &&st
: *transactions
) {
257 coll_t(spg_t(pgid
, st
.first
)),
258 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
));
264 if (op
.is_fresh_object() && entry
) {
265 entry
->mod_desc
.create();
270 [&](const PGTransaction::ObjectOperation::Init::None
&) {},
271 [&](const PGTransaction::ObjectOperation::Init::Create
&op
) {
272 for (auto &&st
: *transactions
) {
274 coll_t(spg_t(pgid
, st
.first
)),
275 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
));
278 [&](const PGTransaction::ObjectOperation::Init::Clone
&op
) {
279 for (auto &&st
: *transactions
) {
281 coll_t(spg_t(pgid
, st
.first
)),
282 ghobject_t(op
.source
, ghobject_t::NO_GEN
, st
.first
),
283 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
));
286 auto siter
= hash_infos
.find(op
.source
);
287 assert(siter
!= hash_infos
.end());
288 hinfo
->update_to(*(siter
->second
));
291 auto cobciter
= obc_map
.find(op
.source
);
292 assert(cobciter
!= obc_map
.end());
293 obc
->attr_cache
= cobciter
->second
->attr_cache
;
296 [&](const PGTransaction::ObjectOperation::Init::Rename
&op
) {
297 assert(op
.source
.is_temp());
298 for (auto &&st
: *transactions
) {
299 st
.second
.collection_move_rename(
300 coll_t(spg_t(pgid
, st
.first
)),
301 ghobject_t(op
.source
, ghobject_t::NO_GEN
, st
.first
),
302 coll_t(spg_t(pgid
, st
.first
)),
303 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
));
305 auto siter
= hash_infos
.find(op
.source
);
306 assert(siter
!= hash_infos
.end());
307 hinfo
->update_to(*(siter
->second
));
309 auto cobciter
= obc_map
.find(op
.source
);
310 assert(cobciter
== obc_map
.end());
311 obc
->attr_cache
.clear();
315 // omap not supported (except 0, handled above)
316 assert(!(op
.clear_omap
));
317 assert(!(op
.omap_header
));
318 assert(op
.omap_updates
.empty());
320 if (!op
.attr_updates
.empty()) {
321 map
<string
, bufferlist
> to_set
;
322 for (auto &&j
: op
.attr_updates
) {
324 to_set
[j
.first
] = *(j
.second
);
326 for (auto &&st
: *transactions
) {
328 coll_t(spg_t(pgid
, st
.first
)),
329 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
),
334 auto citer
= obc
->attr_cache
.find(j
.first
);
336 if (citer
!= obc
->attr_cache
.end()) {
337 // won't overwrite anything we put in earlier
338 xattr_rollback
.insert(
341 boost::optional
<bufferlist
>(citer
->second
)));
343 // won't overwrite anything we put in earlier
344 xattr_rollback
.insert(
351 obc
->attr_cache
[j
.first
] = *(j
.second
);
352 } else if (citer
!= obc
->attr_cache
.end()) {
353 obc
->attr_cache
.erase(citer
);
359 for (auto &&st
: *transactions
) {
361 coll_t(spg_t(pgid
, st
.first
)),
362 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
),
365 assert(!xattr_rollback
.empty());
367 if (entry
&& !xattr_rollback
.empty()) {
368 entry
->mod_desc
.setattrs(xattr_rollback
);
372 /* logical_to_next_chunk_offset() scales down both aligned and
375 * we don't bother to roll this back at this time for two reasons:
377 * 2) we don't track the old value */
378 uint64_t object_size
= sinfo
.logical_to_next_chunk_offset(
379 op
.alloc_hint
->expected_object_size
);
380 uint64_t write_size
= sinfo
.logical_to_next_chunk_offset(
381 op
.alloc_hint
->expected_write_size
);
383 for (auto &&st
: *transactions
) {
384 st
.second
.set_alloc_hint(
385 coll_t(spg_t(pgid
, st
.first
)),
386 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
),
389 op
.alloc_hint
->flags
);
394 auto pextiter
= partial_extents
.find(oid
);
395 if (pextiter
!= partial_extents
.end()) {
396 to_write
= pextiter
->second
;
399 vector
<pair
<uint64_t, uint64_t> > rollback_extents
;
400 const uint64_t orig_size
= hinfo
->get_total_logical_size(sinfo
);
402 uint64_t new_size
= orig_size
;
403 uint64_t append_after
= new_size
;
404 ldpp_dout(dpp
, 20) << __func__
<< ": new_size start " << new_size
<< dendl
;
405 if (op
.truncate
&& op
.truncate
->first
< new_size
) {
406 assert(!op
.is_fresh_object());
407 new_size
= sinfo
.logical_to_next_stripe_offset(
409 ldpp_dout(dpp
, 20) << __func__
<< ": new_size truncate down "
410 << new_size
<< dendl
;
411 if (new_size
!= op
.truncate
->first
) { // 0 the unaligned part
413 bl
.append_zero(new_size
- op
.truncate
->first
);
418 append_after
= sinfo
.logical_to_prev_stripe_offset(
421 append_after
= new_size
;
425 std::numeric_limits
<uint64_t>::max() - new_size
);
427 if (entry
&& !op
.is_fresh_object()) {
428 uint64_t restore_from
= sinfo
.logical_to_prev_chunk_offset(
430 uint64_t restore_len
= sinfo
.aligned_logical_offset_to_chunk_offset(
432 sinfo
.logical_to_prev_stripe_offset(op
.truncate
->first
));
433 assert(rollback_extents
.empty());
435 ldpp_dout(dpp
, 20) << __func__
<< ": saving extent "
436 << make_pair(restore_from
, restore_len
)
438 ldpp_dout(dpp
, 20) << __func__
<< ": truncating to "
441 rollback_extents
.emplace_back(
442 make_pair(restore_from
, restore_len
));
443 for (auto &&st
: *transactions
) {
445 coll_t(spg_t(pgid
, st
.first
)),
446 ghobject_t(oid
, entry
->version
.version
, st
.first
));
447 st
.second
.clone_range(
448 coll_t(spg_t(pgid
, st
.first
)),
449 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
),
450 ghobject_t(oid
, entry
->version
.version
, st
.first
),
457 ldpp_dout(dpp
, 20) << __func__
<< ": not saving extents, fresh object"
460 for (auto &&st
: *transactions
) {
462 coll_t(spg_t(pgid
, st
.first
)),
463 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
),
464 sinfo
.aligned_logical_offset_to_chunk_offset(new_size
));
468 uint32_t fadvise_flags
= 0;
469 for (auto &&extent
: op
.buffer_updates
) {
470 using BufferUpdate
= PGTransaction::ObjectOperation::BufferUpdate
;
474 [&](const BufferUpdate::Write
&op
) {
476 fadvise_flags
|= op
.fadvise_flags
;
478 [&](const BufferUpdate::Zero
&) {
479 bl
.append_zero(extent
.get_len());
481 [&](const BufferUpdate::CloneRange
&) {
484 "CloneRange is not allowed, do_op should have returned ENOTSUPP");
487 uint64_t off
= extent
.get_off();
488 uint64_t len
= extent
.get_len();
489 uint64_t end
= off
+ len
;
490 ldpp_dout(dpp
, 20) << __func__
<< ": adding buffer_update "
491 << make_pair(off
, len
)
494 if (off
> new_size
) {
495 assert(off
> append_after
);
496 bl
.prepend_zero(off
- new_size
);
497 len
+= off
- new_size
;
498 ldpp_dout(dpp
, 20) << __func__
<< ": prepending zeroes to align "
499 << off
<< "->" << new_size
503 if (!sinfo
.logical_offset_is_stripe_aligned(end
) && (end
> append_after
)) {
504 uint64_t aligned_end
= sinfo
.logical_to_next_stripe_offset(
506 uint64_t tail
= aligned_end
- end
;
507 bl
.append_zero(tail
);
508 ldpp_dout(dpp
, 20) << __func__
<< ": appending zeroes to align end "
509 << end
<< "->" << end
+tail
510 << ", len: " << len
<< "->" << len
+tail
516 to_write
.insert(off
, len
, bl
);
522 op
.truncate
->second
> new_size
) {
523 assert(op
.truncate
->second
> append_after
);
524 uint64_t truncate_to
=
525 sinfo
.logical_to_next_stripe_offset(
526 op
.truncate
->second
);
527 uint64_t zeroes
= truncate_to
- new_size
;
529 bl
.append_zero(zeroes
);
534 new_size
= truncate_to
;
535 ldpp_dout(dpp
, 20) << __func__
<< ": truncating out to "
541 for (unsigned i
= 0; i
< ecimpl
->get_chunk_count(); ++i
) {
544 auto to_overwrite
= to_write
.intersect(0, append_after
);
545 ldpp_dout(dpp
, 20) << __func__
<< ": to_overwrite: "
548 for (auto &&extent
: to_overwrite
) {
549 assert(extent
.get_off() + extent
.get_len() <= append_after
);
550 assert(sinfo
.logical_offset_is_stripe_aligned(extent
.get_off()));
551 assert(sinfo
.logical_offset_is_stripe_aligned(extent
.get_len()));
553 uint64_t restore_from
= sinfo
.aligned_logical_offset_to_chunk_offset(
555 uint64_t restore_len
= sinfo
.aligned_logical_offset_to_chunk_offset(
557 ldpp_dout(dpp
, 20) << __func__
<< ": overwriting "
558 << restore_from
<< "~" << restore_len
560 if (rollback_extents
.empty()) {
561 for (auto &&st
: *transactions
) {
563 coll_t(spg_t(pgid
, st
.first
)),
564 ghobject_t(oid
, entry
->version
.version
, st
.first
));
567 rollback_extents
.emplace_back(make_pair(restore_from
, restore_len
));
568 for (auto &&st
: *transactions
) {
569 st
.second
.clone_range(
570 coll_t(spg_t(pgid
, st
.first
)),
571 ghobject_t(oid
, ghobject_t::NO_GEN
, st
.first
),
572 ghobject_t(oid
, entry
->version
.version
, st
.first
),
593 auto to_append
= to_write
.intersect(
595 std::numeric_limits
<uint64_t>::max() - append_after
);
596 ldpp_dout(dpp
, 20) << __func__
<< ": to_append: "
599 for (auto &&extent
: to_append
) {
600 assert(sinfo
.logical_offset_is_stripe_aligned(extent
.get_off()));
601 assert(sinfo
.logical_offset_is_stripe_aligned(extent
.get_len()));
602 ldpp_dout(dpp
, 20) << __func__
<< ": appending "
603 << extent
.get_off() << "~" << extent
.get_len()
620 ldpp_dout(dpp
, 20) << __func__
<< ": " << oid
621 << " resetting hinfo to logical size "
624 if (!rollback_extents
.empty() && entry
) {
626 ldpp_dout(dpp
, 20) << __func__
<< ": " << oid
627 << " marking rollback extents "
630 entry
->mod_desc
.rollback_extents(
631 entry
->version
.version
, rollback_extents
);
633 hinfo
->set_total_chunk_size_clear_hash(
634 sinfo
.aligned_logical_offset_to_chunk_offset(new_size
));
636 assert(hinfo
->get_total_logical_size(sinfo
) == new_size
);
639 if (entry
&& !to_append
.empty()) {
640 ldpp_dout(dpp
, 20) << __func__
<< ": marking append "
643 entry
->mod_desc
.append(append_after
);
646 if (!op
.is_delete()) {
648 ::encode(*hinfo
, hbuf
);
649 for (auto &&i
: *transactions
) {
651 coll_t(spg_t(pgid
, i
.first
)),
652 ghobject_t(oid
, ghobject_t::NO_GEN
, i
.first
),
653 ECUtil::get_hinfo_key(),