]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/ECTransaction.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / osd / ECTransaction.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <iostream>
16 #include <vector>
17 #include <sstream>
18
19 #include "ECTransaction.h"
20 #include "ECUtil.h"
21 #include "os/ObjectStore.h"
22 #include "common/inline_variant.h"
23
24
25 void encode_and_write(
26 pg_t pgid,
27 const hobject_t &oid,
28 const ECUtil::stripe_info_t &sinfo,
29 ErasureCodeInterfaceRef &ecimpl,
30 const set<int> &want,
31 uint64_t offset,
32 bufferlist bl,
33 uint32_t flags,
34 ECUtil::HashInfoRef hinfo,
35 extent_map &written,
36 map<shard_id_t, ObjectStore::Transaction> *transactions,
37 DoutPrefixProvider *dpp) {
38 const uint64_t before_size = hinfo->get_total_logical_size(sinfo);
39 ceph_assert(sinfo.logical_offset_is_stripe_aligned(offset));
40 ceph_assert(sinfo.logical_offset_is_stripe_aligned(bl.length()));
41 ceph_assert(bl.length());
42
43 map<int, bufferlist> buffers;
44 int r = ECUtil::encode(
45 sinfo, ecimpl, bl, want, &buffers);
46 ceph_assert(r == 0);
47
48 written.insert(offset, bl.length(), bl);
49
50 ldpp_dout(dpp, 20) << __func__ << ": " << oid
51 << " new_size "
52 << offset + bl.length()
53 << dendl;
54
55 if (offset >= before_size) {
56 ceph_assert(offset == before_size);
57 hinfo->append(
58 sinfo.aligned_logical_offset_to_chunk_offset(offset),
59 buffers);
60 }
61
62 for (auto &&i : *transactions) {
63 ceph_assert(buffers.count(i.first));
64 bufferlist &enc_bl = buffers[i.first];
65 if (offset >= before_size) {
66 i.second.set_alloc_hint(
67 coll_t(spg_t(pgid, i.first)),
68 ghobject_t(oid, ghobject_t::NO_GEN, i.first),
69 0, 0,
70 CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE |
71 CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY);
72 }
73 i.second.write(
74 coll_t(spg_t(pgid, i.first)),
75 ghobject_t(oid, ghobject_t::NO_GEN, i.first),
76 sinfo.logical_to_prev_chunk_offset(
77 offset),
78 enc_bl.length(),
79 enc_bl,
80 flags);
81 }
82 }
83
84 bool ECTransaction::requires_overwrite(
85 uint64_t prev_size,
86 const PGTransaction::ObjectOperation &op) {
87 // special handling for truncates to 0
88 if (op.truncate && op.truncate->first == 0)
89 return false;
90 return op.is_none() &&
91 ((!op.buffer_updates.empty() &&
92 (op.buffer_updates.begin().get_off() < prev_size)) ||
93 (op.truncate &&
94 (op.truncate->first < prev_size)));
95 }
96
97 void ECTransaction::generate_transactions(
98 WritePlan &plan,
99 ErasureCodeInterfaceRef &ecimpl,
100 pg_t pgid,
101 const ECUtil::stripe_info_t &sinfo,
102 const map<hobject_t,extent_map> &partial_extents,
103 vector<pg_log_entry_t> &entries,
104 map<hobject_t,extent_map> *written_map,
105 map<shard_id_t, ObjectStore::Transaction> *transactions,
106 set<hobject_t> *temp_added,
107 set<hobject_t> *temp_removed,
108 DoutPrefixProvider *dpp,
109 const ceph_release_t require_osd_release)
110 {
111 ceph_assert(written_map);
112 ceph_assert(transactions);
113 ceph_assert(temp_added);
114 ceph_assert(temp_removed);
115 ceph_assert(plan.t);
116 auto &t = *(plan.t);
117
118 auto &hash_infos = plan.hash_infos;
119
120 map<hobject_t, pg_log_entry_t*> obj_to_log;
121 for (auto &&i: entries) {
122 obj_to_log.insert(make_pair(i.soid, &i));
123 }
124
125 t.safe_create_traverse(
126 [&](pair<const hobject_t, PGTransaction::ObjectOperation> &opair) {
127 const hobject_t &oid = opair.first;
128 auto &op = opair.second;
129 auto &obc_map = t.obc_map;
130 auto &written = (*written_map)[oid];
131
132 auto iter = obj_to_log.find(oid);
133 pg_log_entry_t *entry = iter != obj_to_log.end() ? iter->second : nullptr;
134
135 ObjectContextRef obc;
136 auto obiter = t.obc_map.find(oid);
137 if (obiter != t.obc_map.end()) {
138 obc = obiter->second;
139 }
140 if (entry) {
141 ceph_assert(obc);
142 } else {
143 ceph_assert(oid.is_temp());
144 }
145
146 ECUtil::HashInfoRef hinfo;
147 {
148 auto iter = hash_infos.find(oid);
149 ceph_assert(iter != hash_infos.end());
150 hinfo = iter->second;
151 }
152
153 if (oid.is_temp()) {
154 if (op.is_fresh_object()) {
155 temp_added->insert(oid);
156 } else if (op.is_delete()) {
157 temp_removed->insert(oid);
158 }
159 }
160
161 if (entry &&
162 entry->is_modify() &&
163 op.updated_snaps) {
164 bufferlist bl(op.updated_snaps->second.size() * 8 + 8);
165 encode(op.updated_snaps->second, bl);
166 entry->snaps.swap(bl);
167 entry->snaps.reassign_to_mempool(mempool::mempool_osd_pglog);
168 }
169
170 ldpp_dout(dpp, 20) << "generate_transactions: "
171 << opair.first
172 << ", current size is "
173 << hinfo->get_total_logical_size(sinfo)
174 << " buffers are "
175 << op.buffer_updates
176 << dendl;
177 if (op.truncate) {
178 ldpp_dout(dpp, 20) << "generate_transactions: "
179 << " truncate is "
180 << *(op.truncate)
181 << dendl;
182 }
183
184 if (entry && op.updated_snaps) {
185 entry->mod_desc.update_snaps(op.updated_snaps->first);
186 }
187
188 map<string, std::optional<bufferlist> > xattr_rollback;
189 ceph_assert(hinfo);
190 bufferlist old_hinfo;
191 encode(*hinfo, old_hinfo);
192 xattr_rollback[ECUtil::get_hinfo_key()] = old_hinfo;
193
194 if (op.is_none() && op.truncate && op.truncate->first == 0) {
195 ceph_assert(op.truncate->first == 0);
196 ceph_assert(op.truncate->first ==
197 op.truncate->second);
198 ceph_assert(entry);
199 ceph_assert(obc);
200
201 if (op.truncate->first != op.truncate->second) {
202 op.truncate->first = op.truncate->second;
203 } else {
204 op.truncate = std::nullopt;
205 }
206
207 op.delete_first = true;
208 op.init_type = PGTransaction::ObjectOperation::Init::Create();
209
210 if (obc) {
211 /* We need to reapply all of the cached xattrs.
212 * std::map insert fortunately only writes keys
213 * which don't already exist, so this should do
214 * the right thing. */
215 op.attr_updates.insert(
216 obc->attr_cache.begin(),
217 obc->attr_cache.end());
218 }
219 }
220
221 if (op.delete_first) {
222 /* We also want to remove the std::nullopt entries since
223 * the keys already won't exist */
224 for (auto j = op.attr_updates.begin();
225 j != op.attr_updates.end();
226 ) {
227 if (j->second) {
228 ++j;
229 } else {
230 op.attr_updates.erase(j++);
231 }
232 }
233 /* Fill in all current entries for xattr rollback */
234 if (obc) {
235 xattr_rollback.insert(
236 obc->attr_cache.begin(),
237 obc->attr_cache.end());
238 obc->attr_cache.clear();
239 }
240 if (entry) {
241 entry->mod_desc.rmobject(entry->version.version);
242 for (auto &&st: *transactions) {
243 st.second.collection_move_rename(
244 coll_t(spg_t(pgid, st.first)),
245 ghobject_t(oid, ghobject_t::NO_GEN, st.first),
246 coll_t(spg_t(pgid, st.first)),
247 ghobject_t(oid, entry->version.version, st.first));
248 }
249 } else {
250 for (auto &&st: *transactions) {
251 st.second.remove(
252 coll_t(spg_t(pgid, st.first)),
253 ghobject_t(oid, ghobject_t::NO_GEN, st.first));
254 }
255 }
256 hinfo->clear();
257 }
258
259 if (op.is_fresh_object() && entry) {
260 entry->mod_desc.create();
261 }
262
263 match(
264 op.init_type,
265 [&](const PGTransaction::ObjectOperation::Init::None &) {},
266 [&](const PGTransaction::ObjectOperation::Init::Create &op) {
267 for (auto &&st: *transactions) {
268 if (require_osd_release >= ceph_release_t::octopus) {
269 st.second.create(
270 coll_t(spg_t(pgid, st.first)),
271 ghobject_t(oid, ghobject_t::NO_GEN, st.first));
272 } else {
273 st.second.touch(
274 coll_t(spg_t(pgid, st.first)),
275 ghobject_t(oid, ghobject_t::NO_GEN, st.first));
276 }
277 }
278 },
279 [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
280 for (auto &&st: *transactions) {
281 st.second.clone(
282 coll_t(spg_t(pgid, st.first)),
283 ghobject_t(op.source, ghobject_t::NO_GEN, st.first),
284 ghobject_t(oid, ghobject_t::NO_GEN, st.first));
285 }
286
287 auto siter = hash_infos.find(op.source);
288 ceph_assert(siter != hash_infos.end());
289 hinfo->update_to(*(siter->second));
290
291 if (obc) {
292 auto cobciter = obc_map.find(op.source);
293 ceph_assert(cobciter != obc_map.end());
294 obc->attr_cache = cobciter->second->attr_cache;
295 }
296 },
297 [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
298 ceph_assert(op.source.is_temp());
299 for (auto &&st: *transactions) {
300 st.second.collection_move_rename(
301 coll_t(spg_t(pgid, st.first)),
302 ghobject_t(op.source, ghobject_t::NO_GEN, st.first),
303 coll_t(spg_t(pgid, st.first)),
304 ghobject_t(oid, ghobject_t::NO_GEN, st.first));
305 }
306 auto siter = hash_infos.find(op.source);
307 ceph_assert(siter != hash_infos.end());
308 hinfo->update_to(*(siter->second));
309 if (obc) {
310 auto cobciter = obc_map.find(op.source);
311 ceph_assert(cobciter == obc_map.end());
312 obc->attr_cache.clear();
313 }
314 });
315
316 // omap not supported (except 0, handled above)
317 ceph_assert(!(op.clear_omap));
318 ceph_assert(!(op.omap_header));
319 ceph_assert(op.omap_updates.empty());
320
321 if (!op.attr_updates.empty()) {
322 map<string, bufferlist> to_set;
323 for (auto &&j: op.attr_updates) {
324 if (j.second) {
325 to_set[j.first] = *(j.second);
326 } else {
327 for (auto &&st : *transactions) {
328 st.second.rmattr(
329 coll_t(spg_t(pgid, st.first)),
330 ghobject_t(oid, ghobject_t::NO_GEN, st.first),
331 j.first);
332 }
333 }
334 if (obc) {
335 auto citer = obc->attr_cache.find(j.first);
336 if (entry) {
337 if (citer != obc->attr_cache.end()) {
338 // won't overwrite anything we put in earlier
339 xattr_rollback.insert(
340 make_pair(
341 j.first,
342 std::optional<bufferlist>(citer->second)));
343 } else {
344 // won't overwrite anything we put in earlier
345 xattr_rollback.insert(
346 make_pair(
347 j.first,
348 std::nullopt));
349 }
350 }
351 if (j.second) {
352 obc->attr_cache[j.first] = *(j.second);
353 } else if (citer != obc->attr_cache.end()) {
354 obc->attr_cache.erase(citer);
355 }
356 } else {
357 ceph_assert(!entry);
358 }
359 }
360 for (auto &&st : *transactions) {
361 st.second.setattrs(
362 coll_t(spg_t(pgid, st.first)),
363 ghobject_t(oid, ghobject_t::NO_GEN, st.first),
364 to_set);
365 }
366 ceph_assert(!xattr_rollback.empty());
367 }
368 if (entry && !xattr_rollback.empty()) {
369 entry->mod_desc.setattrs(xattr_rollback);
370 }
371
372 if (op.alloc_hint) {
373 /* logical_to_next_chunk_offset() scales down both aligned and
374 * unaligned offsets
375
376 * we don't bother to roll this back at this time for two reasons:
377 * 1) it's advisory
378 * 2) we don't track the old value */
379 uint64_t object_size = sinfo.logical_to_next_chunk_offset(
380 op.alloc_hint->expected_object_size);
381 uint64_t write_size = sinfo.logical_to_next_chunk_offset(
382 op.alloc_hint->expected_write_size);
383
384 for (auto &&st : *transactions) {
385 st.second.set_alloc_hint(
386 coll_t(spg_t(pgid, st.first)),
387 ghobject_t(oid, ghobject_t::NO_GEN, st.first),
388 object_size,
389 write_size,
390 op.alloc_hint->flags);
391 }
392 }
393
394 extent_map to_write;
395 auto pextiter = partial_extents.find(oid);
396 if (pextiter != partial_extents.end()) {
397 to_write = pextiter->second;
398 }
399
400 vector<pair<uint64_t, uint64_t> > rollback_extents;
401 const uint64_t orig_size = hinfo->get_total_logical_size(sinfo);
402
403 uint64_t new_size = orig_size;
404 uint64_t append_after = new_size;
405 ldpp_dout(dpp, 20) << __func__ << ": new_size start " << new_size << dendl;
406 if (op.truncate && op.truncate->first < new_size) {
407 ceph_assert(!op.is_fresh_object());
408 new_size = sinfo.logical_to_next_stripe_offset(
409 op.truncate->first);
410 ldpp_dout(dpp, 20) << __func__ << ": new_size truncate down "
411 << new_size << dendl;
412 if (new_size != op.truncate->first) { // 0 the unaligned part
413 bufferlist bl;
414 bl.append_zero(new_size - op.truncate->first);
415 to_write.insert(
416 op.truncate->first,
417 bl.length(),
418 bl);
419 append_after = sinfo.logical_to_prev_stripe_offset(
420 op.truncate->first);
421 } else {
422 append_after = new_size;
423 }
424 to_write.erase(
425 new_size,
426 std::numeric_limits<uint64_t>::max() - new_size);
427
428 if (entry && !op.is_fresh_object()) {
429 uint64_t restore_from = sinfo.logical_to_prev_chunk_offset(
430 op.truncate->first);
431 uint64_t restore_len = sinfo.aligned_logical_offset_to_chunk_offset(
432 orig_size -
433 sinfo.logical_to_prev_stripe_offset(op.truncate->first));
434 ceph_assert(rollback_extents.empty());
435
436 ldpp_dout(dpp, 20) << __func__ << ": saving extent "
437 << make_pair(restore_from, restore_len)
438 << dendl;
439 ldpp_dout(dpp, 20) << __func__ << ": truncating to "
440 << new_size
441 << dendl;
442 rollback_extents.emplace_back(
443 make_pair(restore_from, restore_len));
444 for (auto &&st : *transactions) {
445 st.second.touch(
446 coll_t(spg_t(pgid, st.first)),
447 ghobject_t(oid, entry->version.version, st.first));
448 st.second.clone_range(
449 coll_t(spg_t(pgid, st.first)),
450 ghobject_t(oid, ghobject_t::NO_GEN, st.first),
451 ghobject_t(oid, entry->version.version, st.first),
452 restore_from,
453 restore_len,
454 restore_from);
455
456 }
457 } else {
458 ldpp_dout(dpp, 20) << __func__ << ": not saving extents, fresh object"
459 << dendl;
460 }
461 for (auto &&st : *transactions) {
462 st.second.truncate(
463 coll_t(spg_t(pgid, st.first)),
464 ghobject_t(oid, ghobject_t::NO_GEN, st.first),
465 sinfo.aligned_logical_offset_to_chunk_offset(new_size));
466 }
467 }
468
469 uint32_t fadvise_flags = 0;
470 for (auto &&extent: op.buffer_updates) {
471 using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
472 bufferlist bl;
473 match(
474 extent.get_val(),
475 [&](const BufferUpdate::Write &op) {
476 bl = op.buffer;
477 fadvise_flags |= op.fadvise_flags;
478 },
479 [&](const BufferUpdate::Zero &) {
480 bl.append_zero(extent.get_len());
481 },
482 [&](const BufferUpdate::CloneRange &) {
483 ceph_assert(
484 0 ==
485 "CloneRange is not allowed, do_op should have returned ENOTSUPP");
486 });
487
488 uint64_t off = extent.get_off();
489 uint64_t len = extent.get_len();
490 uint64_t end = off + len;
491 ldpp_dout(dpp, 20) << __func__ << ": adding buffer_update "
492 << make_pair(off, len)
493 << dendl;
494 ceph_assert(len > 0);
495 if (off > new_size) {
496 ceph_assert(off > append_after);
497 bl.prepend_zero(off - new_size);
498 len += off - new_size;
499 ldpp_dout(dpp, 20) << __func__ << ": prepending zeroes to align "
500 << off << "->" << new_size
501 << dendl;
502 off = new_size;
503 }
504 if (!sinfo.logical_offset_is_stripe_aligned(end) && (end > append_after)) {
505 uint64_t aligned_end = sinfo.logical_to_next_stripe_offset(
506 end);
507 uint64_t tail = aligned_end - end;
508 bl.append_zero(tail);
509 ldpp_dout(dpp, 20) << __func__ << ": appending zeroes to align end "
510 << end << "->" << end+tail
511 << ", len: " << len << "->" << len+tail
512 << dendl;
513 end += tail;
514 len += tail;
515 }
516
517 to_write.insert(off, len, bl);
518 if (end > new_size)
519 new_size = end;
520 }
521
522 if (op.truncate &&
523 op.truncate->second > new_size) {
524 ceph_assert(op.truncate->second > append_after);
525 uint64_t truncate_to =
526 sinfo.logical_to_next_stripe_offset(
527 op.truncate->second);
528 uint64_t zeroes = truncate_to - new_size;
529 bufferlist bl;
530 bl.append_zero(zeroes);
531 to_write.insert(
532 new_size,
533 zeroes,
534 bl);
535 new_size = truncate_to;
536 ldpp_dout(dpp, 20) << __func__ << ": truncating out to "
537 << truncate_to
538 << dendl;
539 }
540
541 set<int> want;
542 for (unsigned i = 0; i < ecimpl->get_chunk_count(); ++i) {
543 want.insert(i);
544 }
545 auto to_overwrite = to_write.intersect(0, append_after);
546 ldpp_dout(dpp, 20) << __func__ << ": to_overwrite: "
547 << to_overwrite
548 << dendl;
549 for (auto &&extent: to_overwrite) {
550 ceph_assert(extent.get_off() + extent.get_len() <= append_after);
551 ceph_assert(sinfo.logical_offset_is_stripe_aligned(extent.get_off()));
552 ceph_assert(sinfo.logical_offset_is_stripe_aligned(extent.get_len()));
553 if (entry) {
554 uint64_t restore_from = sinfo.aligned_logical_offset_to_chunk_offset(
555 extent.get_off());
556 uint64_t restore_len = sinfo.aligned_logical_offset_to_chunk_offset(
557 extent.get_len());
558 ldpp_dout(dpp, 20) << __func__ << ": overwriting "
559 << restore_from << "~" << restore_len
560 << dendl;
561 if (rollback_extents.empty()) {
562 for (auto &&st : *transactions) {
563 st.second.touch(
564 coll_t(spg_t(pgid, st.first)),
565 ghobject_t(oid, entry->version.version, st.first));
566 }
567 }
568 rollback_extents.emplace_back(make_pair(restore_from, restore_len));
569 for (auto &&st : *transactions) {
570 st.second.clone_range(
571 coll_t(spg_t(pgid, st.first)),
572 ghobject_t(oid, ghobject_t::NO_GEN, st.first),
573 ghobject_t(oid, entry->version.version, st.first),
574 restore_from,
575 restore_len,
576 restore_from);
577 }
578 }
579 encode_and_write(
580 pgid,
581 oid,
582 sinfo,
583 ecimpl,
584 want,
585 extent.get_off(),
586 extent.get_val(),
587 fadvise_flags,
588 hinfo,
589 written,
590 transactions,
591 dpp);
592 }
593
594 auto to_append = to_write.intersect(
595 append_after,
596 std::numeric_limits<uint64_t>::max() - append_after);
597 ldpp_dout(dpp, 20) << __func__ << ": to_append: "
598 << to_append
599 << dendl;
600 for (auto &&extent: to_append) {
601 ceph_assert(sinfo.logical_offset_is_stripe_aligned(extent.get_off()));
602 ceph_assert(sinfo.logical_offset_is_stripe_aligned(extent.get_len()));
603 ldpp_dout(dpp, 20) << __func__ << ": appending "
604 << extent.get_off() << "~" << extent.get_len()
605 << dendl;
606 encode_and_write(
607 pgid,
608 oid,
609 sinfo,
610 ecimpl,
611 want,
612 extent.get_off(),
613 extent.get_val(),
614 fadvise_flags,
615 hinfo,
616 written,
617 transactions,
618 dpp);
619 }
620
621 ldpp_dout(dpp, 20) << __func__ << ": " << oid
622 << " resetting hinfo to logical size "
623 << new_size
624 << dendl;
625 if (!rollback_extents.empty() && entry) {
626 if (entry) {
627 ldpp_dout(dpp, 20) << __func__ << ": " << oid
628 << " marking rollback extents "
629 << rollback_extents
630 << dendl;
631 entry->mod_desc.rollback_extents(
632 entry->version.version, rollback_extents);
633 }
634 hinfo->set_total_chunk_size_clear_hash(
635 sinfo.aligned_logical_offset_to_chunk_offset(new_size));
636 } else {
637 ceph_assert(hinfo->get_total_logical_size(sinfo) == new_size);
638 }
639
640 if (entry && !to_append.empty()) {
641 ldpp_dout(dpp, 20) << __func__ << ": marking append "
642 << append_after
643 << dendl;
644 entry->mod_desc.append(append_after);
645 }
646
647 if (!op.is_delete()) {
648 bufferlist hbuf;
649 encode(*hinfo, hbuf);
650 for (auto &&i : *transactions) {
651 i.second.setattr(
652 coll_t(spg_t(pgid, i.first)),
653 ghobject_t(oid, ghobject_t::NO_GEN, i.first),
654 ECUtil::get_hinfo_key(),
655 hbuf);
656 }
657 }
658 });
659 }