]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/ECTransaction.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / osd / ECTransaction.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <iostream>
16 #include <vector>
17 #include <sstream>
18
19 #include "ECTransaction.h"
20 #include "ECUtil.h"
21 #include "os/ObjectStore.h"
22 #include "common/inline_variant.h"
23
24 using std::less;
25 using std::make_pair;
26 using std::map;
27 using std::pair;
28 using std::set;
29 using std::string;
30 using std::vector;
31
32 using ceph::bufferlist;
33 using ceph::decode;
34 using ceph::encode;
35 using ceph::ErasureCodeInterfaceRef;
36
37 void encode_and_write(
38 pg_t pgid,
39 const hobject_t &oid,
40 const ECUtil::stripe_info_t &sinfo,
41 ErasureCodeInterfaceRef &ecimpl,
42 const set<int> &want,
43 uint64_t offset,
44 bufferlist bl,
45 uint32_t flags,
46 ECUtil::HashInfoRef hinfo,
47 extent_map &written,
48 map<shard_id_t, ObjectStore::Transaction> *transactions,
49 DoutPrefixProvider *dpp) {
50 const uint64_t before_size = hinfo->get_total_logical_size(sinfo);
51 ceph_assert(sinfo.logical_offset_is_stripe_aligned(offset));
52 ceph_assert(sinfo.logical_offset_is_stripe_aligned(bl.length()));
53 ceph_assert(bl.length());
54
55 map<int, bufferlist> buffers;
56 int r = ECUtil::encode(
57 sinfo, ecimpl, bl, want, &buffers);
58 ceph_assert(r == 0);
59
60 written.insert(offset, bl.length(), bl);
61
62 ldpp_dout(dpp, 20) << __func__ << ": " << oid
63 << " new_size "
64 << offset + bl.length()
65 << dendl;
66
67 if (offset >= before_size) {
68 ceph_assert(offset == before_size);
69 hinfo->append(
70 sinfo.aligned_logical_offset_to_chunk_offset(offset),
71 buffers);
72 }
73
74 for (auto &&i : *transactions) {
75 ceph_assert(buffers.count(i.first));
76 bufferlist &enc_bl = buffers[i.first];
77 if (offset >= before_size) {
78 i.second.set_alloc_hint(
79 coll_t(spg_t(pgid, i.first)),
80 ghobject_t(oid, ghobject_t::NO_GEN, i.first),
81 0, 0,
82 CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE |
83 CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY);
84 }
85 i.second.write(
86 coll_t(spg_t(pgid, i.first)),
87 ghobject_t(oid, ghobject_t::NO_GEN, i.first),
88 sinfo.logical_to_prev_chunk_offset(
89 offset),
90 enc_bl.length(),
91 enc_bl,
92 flags);
93 }
94 }
95
96 bool ECTransaction::requires_overwrite(
97 uint64_t prev_size,
98 const PGTransaction::ObjectOperation &op) {
99 // special handling for truncates to 0
100 if (op.truncate && op.truncate->first == 0)
101 return false;
102 return op.is_none() &&
103 ((!op.buffer_updates.empty() &&
104 (op.buffer_updates.begin().get_off() < prev_size)) ||
105 (op.truncate &&
106 (op.truncate->first < prev_size)));
107 }
108
109 void ECTransaction::generate_transactions(
110 WritePlan &plan,
111 ErasureCodeInterfaceRef &ecimpl,
112 pg_t pgid,
113 const ECUtil::stripe_info_t &sinfo,
114 const map<hobject_t,extent_map> &partial_extents,
115 vector<pg_log_entry_t> &entries,
116 map<hobject_t,extent_map> *written_map,
117 map<shard_id_t, ObjectStore::Transaction> *transactions,
118 set<hobject_t> *temp_added,
119 set<hobject_t> *temp_removed,
120 DoutPrefixProvider *dpp,
121 const ceph_release_t require_osd_release)
122 {
123 ceph_assert(written_map);
124 ceph_assert(transactions);
125 ceph_assert(temp_added);
126 ceph_assert(temp_removed);
127 ceph_assert(plan.t);
128 auto &t = *(plan.t);
129
130 auto &hash_infos = plan.hash_infos;
131
132 map<hobject_t, pg_log_entry_t*> obj_to_log;
133 for (auto &&i: entries) {
134 obj_to_log.insert(make_pair(i.soid, &i));
135 }
136
137 t.safe_create_traverse(
138 [&](pair<const hobject_t, PGTransaction::ObjectOperation> &opair) {
139 const hobject_t &oid = opair.first;
140 auto &op = opair.second;
141 auto &obc_map = t.obc_map;
142 auto &written = (*written_map)[oid];
143
144 auto iter = obj_to_log.find(oid);
145 pg_log_entry_t *entry = iter != obj_to_log.end() ? iter->second : nullptr;
146
147 ObjectContextRef obc;
148 auto obiter = t.obc_map.find(oid);
149 if (obiter != t.obc_map.end()) {
150 obc = obiter->second;
151 }
152 if (entry) {
153 ceph_assert(obc);
154 } else {
155 ceph_assert(oid.is_temp());
156 }
157
158 ECUtil::HashInfoRef hinfo;
159 {
160 auto iter = hash_infos.find(oid);
161 ceph_assert(iter != hash_infos.end());
162 hinfo = iter->second;
163 }
164
165 if (oid.is_temp()) {
166 if (op.is_fresh_object()) {
167 temp_added->insert(oid);
168 } else if (op.is_delete()) {
169 temp_removed->insert(oid);
170 }
171 }
172
173 if (entry &&
174 entry->is_modify() &&
175 op.updated_snaps) {
176 bufferlist bl(op.updated_snaps->second.size() * 8 + 8);
177 encode(op.updated_snaps->second, bl);
178 entry->snaps.swap(bl);
179 entry->snaps.reassign_to_mempool(mempool::mempool_osd_pglog);
180 }
181
182 ldpp_dout(dpp, 20) << "generate_transactions: "
183 << opair.first
184 << ", current size is "
185 << hinfo->get_total_logical_size(sinfo)
186 << " buffers are "
187 << op.buffer_updates
188 << dendl;
189 if (op.truncate) {
190 ldpp_dout(dpp, 20) << "generate_transactions: "
191 << " truncate is "
192 << *(op.truncate)
193 << dendl;
194 }
195
196 if (entry && op.updated_snaps) {
197 entry->mod_desc.update_snaps(op.updated_snaps->first);
198 }
199
200 map<string, std::optional<bufferlist> > xattr_rollback;
201 ceph_assert(hinfo);
202 bufferlist old_hinfo;
203 encode(*hinfo, old_hinfo);
204 xattr_rollback[ECUtil::get_hinfo_key()] = old_hinfo;
205
206 if (op.is_none() && op.truncate && op.truncate->first == 0) {
207 ceph_assert(op.truncate->first == 0);
208 ceph_assert(op.truncate->first ==
209 op.truncate->second);
210 ceph_assert(entry);
211 ceph_assert(obc);
212
213 if (op.truncate->first != op.truncate->second) {
214 op.truncate->first = op.truncate->second;
215 } else {
216 op.truncate = std::nullopt;
217 }
218
219 op.delete_first = true;
220 op.init_type = PGTransaction::ObjectOperation::Init::Create();
221
222 if (obc) {
223 /* We need to reapply all of the cached xattrs.
224 * std::map insert fortunately only writes keys
225 * which don't already exist, so this should do
226 * the right thing. */
227 op.attr_updates.insert(
228 obc->attr_cache.begin(),
229 obc->attr_cache.end());
230 }
231 }
232
233 if (op.delete_first) {
234 /* We also want to remove the std::nullopt entries since
235 * the keys already won't exist */
236 for (auto j = op.attr_updates.begin();
237 j != op.attr_updates.end();
238 ) {
239 if (j->second) {
240 ++j;
241 } else {
242 op.attr_updates.erase(j++);
243 }
244 }
245 /* Fill in all current entries for xattr rollback */
246 if (obc) {
247 xattr_rollback.insert(
248 obc->attr_cache.begin(),
249 obc->attr_cache.end());
250 obc->attr_cache.clear();
251 }
252 if (entry) {
253 entry->mod_desc.rmobject(entry->version.version);
254 for (auto &&st: *transactions) {
255 st.second.collection_move_rename(
256 coll_t(spg_t(pgid, st.first)),
257 ghobject_t(oid, ghobject_t::NO_GEN, st.first),
258 coll_t(spg_t(pgid, st.first)),
259 ghobject_t(oid, entry->version.version, st.first));
260 }
261 } else {
262 for (auto &&st: *transactions) {
263 st.second.remove(
264 coll_t(spg_t(pgid, st.first)),
265 ghobject_t(oid, ghobject_t::NO_GEN, st.first));
266 }
267 }
268 hinfo->clear();
269 }
270
271 if (op.is_fresh_object() && entry) {
272 entry->mod_desc.create();
273 }
274
275 match(
276 op.init_type,
277 [&](const PGTransaction::ObjectOperation::Init::None &) {},
278 [&](const PGTransaction::ObjectOperation::Init::Create &op) {
279 for (auto &&st: *transactions) {
280 if (require_osd_release >= ceph_release_t::octopus) {
281 st.second.create(
282 coll_t(spg_t(pgid, st.first)),
283 ghobject_t(oid, ghobject_t::NO_GEN, st.first));
284 } else {
285 st.second.touch(
286 coll_t(spg_t(pgid, st.first)),
287 ghobject_t(oid, ghobject_t::NO_GEN, st.first));
288 }
289 }
290 },
291 [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
292 for (auto &&st: *transactions) {
293 st.second.clone(
294 coll_t(spg_t(pgid, st.first)),
295 ghobject_t(op.source, ghobject_t::NO_GEN, st.first),
296 ghobject_t(oid, ghobject_t::NO_GEN, st.first));
297 }
298
299 auto siter = hash_infos.find(op.source);
300 ceph_assert(siter != hash_infos.end());
301 hinfo->update_to(*(siter->second));
302
303 if (obc) {
304 auto cobciter = obc_map.find(op.source);
305 ceph_assert(cobciter != obc_map.end());
306 obc->attr_cache = cobciter->second->attr_cache;
307 }
308 },
309 [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
310 ceph_assert(op.source.is_temp());
311 for (auto &&st: *transactions) {
312 st.second.collection_move_rename(
313 coll_t(spg_t(pgid, st.first)),
314 ghobject_t(op.source, ghobject_t::NO_GEN, st.first),
315 coll_t(spg_t(pgid, st.first)),
316 ghobject_t(oid, ghobject_t::NO_GEN, st.first));
317 }
318 auto siter = hash_infos.find(op.source);
319 ceph_assert(siter != hash_infos.end());
320 hinfo->update_to(*(siter->second));
321 if (obc) {
322 auto cobciter = obc_map.find(op.source);
323 ceph_assert(cobciter == obc_map.end());
324 obc->attr_cache.clear();
325 }
326 });
327
328 // omap not supported (except 0, handled above)
329 ceph_assert(!(op.clear_omap));
330 ceph_assert(!(op.omap_header));
331 ceph_assert(op.omap_updates.empty());
332
333 if (!op.attr_updates.empty()) {
334 map<string, bufferlist, less<>> to_set;
335 for (auto &&j: op.attr_updates) {
336 if (j.second) {
337 to_set[j.first] = *(j.second);
338 } else {
339 for (auto &&st : *transactions) {
340 st.second.rmattr(
341 coll_t(spg_t(pgid, st.first)),
342 ghobject_t(oid, ghobject_t::NO_GEN, st.first),
343 j.first);
344 }
345 }
346 if (obc) {
347 auto citer = obc->attr_cache.find(j.first);
348 if (entry) {
349 if (citer != obc->attr_cache.end()) {
350 // won't overwrite anything we put in earlier
351 xattr_rollback.insert(
352 make_pair(
353 j.first,
354 std::optional<bufferlist>(citer->second)));
355 } else {
356 // won't overwrite anything we put in earlier
357 xattr_rollback.insert(
358 make_pair(
359 j.first,
360 std::nullopt));
361 }
362 }
363 if (j.second) {
364 obc->attr_cache[j.first] = *(j.second);
365 } else if (citer != obc->attr_cache.end()) {
366 obc->attr_cache.erase(citer);
367 }
368 } else {
369 ceph_assert(!entry);
370 }
371 }
372 for (auto &&st : *transactions) {
373 st.second.setattrs(
374 coll_t(spg_t(pgid, st.first)),
375 ghobject_t(oid, ghobject_t::NO_GEN, st.first),
376 to_set);
377 }
378 ceph_assert(!xattr_rollback.empty());
379 }
380 if (entry && !xattr_rollback.empty()) {
381 entry->mod_desc.setattrs(xattr_rollback);
382 }
383
384 if (op.alloc_hint) {
385 /* logical_to_next_chunk_offset() scales down both aligned and
386 * unaligned offsets
387
388 * we don't bother to roll this back at this time for two reasons:
389 * 1) it's advisory
390 * 2) we don't track the old value */
391 uint64_t object_size = sinfo.logical_to_next_chunk_offset(
392 op.alloc_hint->expected_object_size);
393 uint64_t write_size = sinfo.logical_to_next_chunk_offset(
394 op.alloc_hint->expected_write_size);
395
396 for (auto &&st : *transactions) {
397 st.second.set_alloc_hint(
398 coll_t(spg_t(pgid, st.first)),
399 ghobject_t(oid, ghobject_t::NO_GEN, st.first),
400 object_size,
401 write_size,
402 op.alloc_hint->flags);
403 }
404 }
405
406 extent_map to_write;
407 auto pextiter = partial_extents.find(oid);
408 if (pextiter != partial_extents.end()) {
409 to_write = pextiter->second;
410 }
411
412 vector<pair<uint64_t, uint64_t> > rollback_extents;
413 const uint64_t orig_size = hinfo->get_total_logical_size(sinfo);
414
415 uint64_t new_size = orig_size;
416 uint64_t append_after = new_size;
417 ldpp_dout(dpp, 20) << __func__ << ": new_size start " << new_size << dendl;
418 if (op.truncate && op.truncate->first < new_size) {
419 ceph_assert(!op.is_fresh_object());
420 new_size = sinfo.logical_to_next_stripe_offset(
421 op.truncate->first);
422 ldpp_dout(dpp, 20) << __func__ << ": new_size truncate down "
423 << new_size << dendl;
424 if (new_size != op.truncate->first) { // 0 the unaligned part
425 bufferlist bl;
426 bl.append_zero(new_size - op.truncate->first);
427 to_write.insert(
428 op.truncate->first,
429 bl.length(),
430 bl);
431 append_after = sinfo.logical_to_prev_stripe_offset(
432 op.truncate->first);
433 } else {
434 append_after = new_size;
435 }
436 to_write.erase(
437 new_size,
438 std::numeric_limits<uint64_t>::max() - new_size);
439
440 if (entry && !op.is_fresh_object()) {
441 uint64_t restore_from = sinfo.logical_to_prev_chunk_offset(
442 op.truncate->first);
443 uint64_t restore_len = sinfo.aligned_logical_offset_to_chunk_offset(
444 orig_size -
445 sinfo.logical_to_prev_stripe_offset(op.truncate->first));
446 ceph_assert(rollback_extents.empty());
447
448 ldpp_dout(dpp, 20) << __func__ << ": saving extent "
449 << make_pair(restore_from, restore_len)
450 << dendl;
451 ldpp_dout(dpp, 20) << __func__ << ": truncating to "
452 << new_size
453 << dendl;
454 rollback_extents.emplace_back(
455 make_pair(restore_from, restore_len));
456 for (auto &&st : *transactions) {
457 st.second.touch(
458 coll_t(spg_t(pgid, st.first)),
459 ghobject_t(oid, entry->version.version, st.first));
460 st.second.clone_range(
461 coll_t(spg_t(pgid, st.first)),
462 ghobject_t(oid, ghobject_t::NO_GEN, st.first),
463 ghobject_t(oid, entry->version.version, st.first),
464 restore_from,
465 restore_len,
466 restore_from);
467
468 }
469 } else {
470 ldpp_dout(dpp, 20) << __func__ << ": not saving extents, fresh object"
471 << dendl;
472 }
473 for (auto &&st : *transactions) {
474 st.second.truncate(
475 coll_t(spg_t(pgid, st.first)),
476 ghobject_t(oid, ghobject_t::NO_GEN, st.first),
477 sinfo.aligned_logical_offset_to_chunk_offset(new_size));
478 }
479 }
480
481 uint32_t fadvise_flags = 0;
482 for (auto &&extent: op.buffer_updates) {
483 using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
484 bufferlist bl;
485 match(
486 extent.get_val(),
487 [&](const BufferUpdate::Write &op) {
488 bl = op.buffer;
489 fadvise_flags |= op.fadvise_flags;
490 },
491 [&](const BufferUpdate::Zero &) {
492 bl.append_zero(extent.get_len());
493 },
494 [&](const BufferUpdate::CloneRange &) {
495 ceph_assert(
496 0 ==
497 "CloneRange is not allowed, do_op should have returned ENOTSUPP");
498 });
499
500 uint64_t off = extent.get_off();
501 uint64_t len = extent.get_len();
502 uint64_t end = off + len;
503 ldpp_dout(dpp, 20) << __func__ << ": adding buffer_update "
504 << make_pair(off, len)
505 << dendl;
506 ceph_assert(len > 0);
507 if (off > new_size) {
508 ceph_assert(off > append_after);
509 bl.prepend_zero(off - new_size);
510 len += off - new_size;
511 ldpp_dout(dpp, 20) << __func__ << ": prepending zeroes to align "
512 << off << "->" << new_size
513 << dendl;
514 off = new_size;
515 }
516 if (!sinfo.logical_offset_is_stripe_aligned(end) && (end > append_after)) {
517 uint64_t aligned_end = sinfo.logical_to_next_stripe_offset(
518 end);
519 uint64_t tail = aligned_end - end;
520 bl.append_zero(tail);
521 ldpp_dout(dpp, 20) << __func__ << ": appending zeroes to align end "
522 << end << "->" << end+tail
523 << ", len: " << len << "->" << len+tail
524 << dendl;
525 end += tail;
526 len += tail;
527 }
528
529 to_write.insert(off, len, bl);
530 if (end > new_size)
531 new_size = end;
532 }
533
534 if (op.truncate &&
535 op.truncate->second > new_size) {
536 ceph_assert(op.truncate->second > append_after);
537 uint64_t truncate_to =
538 sinfo.logical_to_next_stripe_offset(
539 op.truncate->second);
540 uint64_t zeroes = truncate_to - new_size;
541 bufferlist bl;
542 bl.append_zero(zeroes);
543 to_write.insert(
544 new_size,
545 zeroes,
546 bl);
547 new_size = truncate_to;
548 ldpp_dout(dpp, 20) << __func__ << ": truncating out to "
549 << truncate_to
550 << dendl;
551 }
552
553 set<int> want;
554 for (unsigned i = 0; i < ecimpl->get_chunk_count(); ++i) {
555 want.insert(i);
556 }
557 auto to_overwrite = to_write.intersect(0, append_after);
558 ldpp_dout(dpp, 20) << __func__ << ": to_overwrite: "
559 << to_overwrite
560 << dendl;
561 for (auto &&extent: to_overwrite) {
562 ceph_assert(extent.get_off() + extent.get_len() <= append_after);
563 ceph_assert(sinfo.logical_offset_is_stripe_aligned(extent.get_off()));
564 ceph_assert(sinfo.logical_offset_is_stripe_aligned(extent.get_len()));
565 if (entry) {
566 uint64_t restore_from = sinfo.aligned_logical_offset_to_chunk_offset(
567 extent.get_off());
568 uint64_t restore_len = sinfo.aligned_logical_offset_to_chunk_offset(
569 extent.get_len());
570 ldpp_dout(dpp, 20) << __func__ << ": overwriting "
571 << restore_from << "~" << restore_len
572 << dendl;
573 if (rollback_extents.empty()) {
574 for (auto &&st : *transactions) {
575 st.second.touch(
576 coll_t(spg_t(pgid, st.first)),
577 ghobject_t(oid, entry->version.version, st.first));
578 }
579 }
580 rollback_extents.emplace_back(make_pair(restore_from, restore_len));
581 for (auto &&st : *transactions) {
582 st.second.clone_range(
583 coll_t(spg_t(pgid, st.first)),
584 ghobject_t(oid, ghobject_t::NO_GEN, st.first),
585 ghobject_t(oid, entry->version.version, st.first),
586 restore_from,
587 restore_len,
588 restore_from);
589 }
590 }
591 encode_and_write(
592 pgid,
593 oid,
594 sinfo,
595 ecimpl,
596 want,
597 extent.get_off(),
598 extent.get_val(),
599 fadvise_flags,
600 hinfo,
601 written,
602 transactions,
603 dpp);
604 }
605
606 auto to_append = to_write.intersect(
607 append_after,
608 std::numeric_limits<uint64_t>::max() - append_after);
609 ldpp_dout(dpp, 20) << __func__ << ": to_append: "
610 << to_append
611 << dendl;
612 for (auto &&extent: to_append) {
613 ceph_assert(sinfo.logical_offset_is_stripe_aligned(extent.get_off()));
614 ceph_assert(sinfo.logical_offset_is_stripe_aligned(extent.get_len()));
615 ldpp_dout(dpp, 20) << __func__ << ": appending "
616 << extent.get_off() << "~" << extent.get_len()
617 << dendl;
618 encode_and_write(
619 pgid,
620 oid,
621 sinfo,
622 ecimpl,
623 want,
624 extent.get_off(),
625 extent.get_val(),
626 fadvise_flags,
627 hinfo,
628 written,
629 transactions,
630 dpp);
631 }
632
633 ldpp_dout(dpp, 20) << __func__ << ": " << oid
634 << " resetting hinfo to logical size "
635 << new_size
636 << dendl;
637 if (!rollback_extents.empty() && entry) {
638 if (entry) {
639 ldpp_dout(dpp, 20) << __func__ << ": " << oid
640 << " marking rollback extents "
641 << rollback_extents
642 << dendl;
643 entry->mod_desc.rollback_extents(
644 entry->version.version, rollback_extents);
645 }
646 hinfo->set_total_chunk_size_clear_hash(
647 sinfo.aligned_logical_offset_to_chunk_offset(new_size));
648 } else {
649 ceph_assert(hinfo->get_total_logical_size(sinfo) == new_size);
650 }
651
652 if (entry && !to_append.empty()) {
653 ldpp_dout(dpp, 20) << __func__ << ": marking append "
654 << append_after
655 << dendl;
656 entry->mod_desc.append(append_after);
657 }
658
659 if (!op.is_delete()) {
660 bufferlist hbuf;
661 encode(*hinfo, hbuf);
662 for (auto &&i : *transactions) {
663 i.second.setattr(
664 coll_t(spg_t(pgid, i.first)),
665 ghobject_t(oid, ghobject_t::NO_GEN, i.first),
666 ECUtil::get_hinfo_key(),
667 hbuf);
668 }
669 }
670 });
671 }