]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/ECTransaction.cc
update sources to v12.1.0
[ceph.git] / ceph / src / osd / ECTransaction.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include <iostream>
16#include <vector>
17#include <vector>
18#include <sstream>
19
20#include "ECTransaction.h"
21#include "ECUtil.h"
22#include "os/ObjectStore.h"
23#include "common/inline_variant.h"
24
25
26void encode_and_write(
27 pg_t pgid,
28 const hobject_t &oid,
29 const ECUtil::stripe_info_t &sinfo,
30 ErasureCodeInterfaceRef &ecimpl,
31 const set<int> &want,
32 uint64_t offset,
33 bufferlist bl,
34 uint32_t flags,
35 ECUtil::HashInfoRef hinfo,
36 extent_map &written,
37 map<shard_id_t, ObjectStore::Transaction> *transactions,
38 DoutPrefixProvider *dpp) {
39 const uint64_t before_size = hinfo->get_total_logical_size(sinfo);
40 assert(sinfo.logical_offset_is_stripe_aligned(offset));
41 assert(sinfo.logical_offset_is_stripe_aligned(bl.length()));
42 assert(bl.length());
43
44 map<int, bufferlist> buffers;
45 int r = ECUtil::encode(
46 sinfo, ecimpl, bl, want, &buffers);
47 assert(r == 0);
48
49 written.insert(offset, bl.length(), bl);
50
51 ldpp_dout(dpp, 20) << __func__ << ": " << oid
52 << " new_size "
53 << offset + bl.length()
54 << dendl;
55
56 if (offset >= before_size) {
57 assert(offset == before_size);
58 hinfo->append(
59 sinfo.aligned_logical_offset_to_chunk_offset(offset),
60 buffers);
61 }
62
63 for (auto &&i : *transactions) {
64 assert(buffers.count(i.first));
65 bufferlist &enc_bl = buffers[i.first];
66 if (offset >= before_size) {
67 i.second.set_alloc_hint(
68 coll_t(spg_t(pgid, i.first)),
69 ghobject_t(oid, ghobject_t::NO_GEN, i.first),
70 0, 0,
71 CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE |
72 CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY);
73 }
74 i.second.write(
75 coll_t(spg_t(pgid, i.first)),
76 ghobject_t(oid, ghobject_t::NO_GEN, i.first),
77 sinfo.logical_to_prev_chunk_offset(
78 offset),
79 enc_bl.length(),
80 enc_bl,
81 flags);
82 }
83}
84
85bool ECTransaction::requires_overwrite(
86 uint64_t prev_size,
87 const PGTransaction::ObjectOperation &op) {
88 // special handling for truncates to 0
89 if (op.truncate && op.truncate->first == 0)
90 return false;
91 return op.is_none() &&
92 ((!op.buffer_updates.empty() &&
93 (op.buffer_updates.begin().get_off() < prev_size)) ||
94 (op.truncate &&
95 (op.truncate->first < prev_size)));
96}
97
98void ECTransaction::generate_transactions(
99 WritePlan &plan,
100 ErasureCodeInterfaceRef &ecimpl,
101 pg_t pgid,
102 bool legacy_log_entries,
103 const ECUtil::stripe_info_t &sinfo,
104 const map<hobject_t,extent_map> &partial_extents,
105 vector<pg_log_entry_t> &entries,
106 map<hobject_t,extent_map> *written_map,
107 map<shard_id_t, ObjectStore::Transaction> *transactions,
108 set<hobject_t> *temp_added,
109 set<hobject_t> *temp_removed,
110 DoutPrefixProvider *dpp)
111{
112 assert(written_map);
113 assert(transactions);
114 assert(temp_added);
115 assert(temp_removed);
116 assert(plan.t);
117 auto &t = *(plan.t);
118
119 auto &hash_infos = plan.hash_infos;
120
121 assert(transactions);
122 assert(temp_added);
123 assert(temp_removed);
124
125 map<hobject_t, pg_log_entry_t*> obj_to_log;
126 for (auto &&i: entries) {
127 obj_to_log.insert(make_pair(i.soid, &i));
128 }
129
130 t.safe_create_traverse(
131 [&](pair<const hobject_t, PGTransaction::ObjectOperation> &opair) {
132 const hobject_t &oid = opair.first;
133 auto &op = opair.second;
134 auto &obc_map = t.obc_map;
135 auto &written = (*written_map)[oid];
136
137 auto iter = obj_to_log.find(oid);
138 pg_log_entry_t *entry = iter != obj_to_log.end() ? iter->second : nullptr;
139
140 ObjectContextRef obc;
141 auto obiter = t.obc_map.find(oid);
142 if (obiter != t.obc_map.end()) {
143 obc = obiter->second;
144 }
145 if (entry) {
146 assert(obc);
147 } else {
148 assert(oid.is_temp());
149 }
150
151 ECUtil::HashInfoRef hinfo;
152 {
153 auto iter = hash_infos.find(oid);
154 assert(iter != hash_infos.end());
155 hinfo = iter->second;
156 }
157
158 if (oid.is_temp()) {
159 if (op.is_fresh_object()) {
160 temp_added->insert(oid);
161 } else if (op.is_delete()) {
162 temp_removed->insert(oid);
163 }
164 }
165
166 if (entry &&
167 entry->is_modify() &&
168 op.updated_snaps) {
31f18b77
FG
169 bufferlist bl(op.updated_snaps->second.size() * 8 + 8);
170 ::encode(op.updated_snaps->second, bl);
171 entry->snaps.swap(bl);
172 entry->snaps.reassign_to_mempool(mempool::mempool_osd_pglog);
7c673cae
FG
173 }
174
175 ldpp_dout(dpp, 20) << "generate_transactions: "
176 << opair.first
177 << ", current size is "
178 << hinfo->get_total_logical_size(sinfo)
179 << " buffers are "
180 << op.buffer_updates
181 << dendl;
182 if (op.truncate) {
183 ldpp_dout(dpp, 20) << "generate_transactions: "
184 << " truncate is "
185 << *(op.truncate)
186 << dendl;
187 }
188
189 if (entry && op.updated_snaps) {
190 entry->mod_desc.update_snaps(op.updated_snaps->first);
191 }
192
193 map<string, boost::optional<bufferlist> > xattr_rollback;
194 assert(hinfo);
195 bufferlist old_hinfo;
196 ::encode(*hinfo, old_hinfo);
197 xattr_rollback[ECUtil::get_hinfo_key()] = old_hinfo;
198
199 if (op.is_none() && op.truncate && op.truncate->first == 0) {
200 assert(op.truncate->first == 0);
201 assert(op.truncate->first ==
202 op.truncate->second);
203 assert(entry);
204 assert(obc);
205
206 if (op.truncate->first != op.truncate->second) {
207 op.truncate->first = op.truncate->second;
208 } else {
209 op.truncate = boost::none;
210 }
211
212 op.delete_first = true;
213 op.init_type = PGTransaction::ObjectOperation::Init::Create();
214
215 if (obc) {
216 /* We need to reapply all of the cached xattrs.
217 * std::map insert fortunately only writes keys
218 * which don't already exist, so this should do
219 * the right thing. */
220 op.attr_updates.insert(
221 obc->attr_cache.begin(),
222 obc->attr_cache.end());
223 }
224 }
225
226 if (op.delete_first) {
227 /* We also want to remove the boost::none entries since
228 * the keys already won't exist */
229 for (auto j = op.attr_updates.begin();
230 j != op.attr_updates.end();
231 ) {
232 if (j->second) {
233 ++j;
234 } else {
235 op.attr_updates.erase(j++);
236 }
237 }
238 /* Fill in all current entries for xattr rollback */
239 if (obc) {
240 xattr_rollback.insert(
241 obc->attr_cache.begin(),
242 obc->attr_cache.end());
243 obc->attr_cache.clear();
244 }
245 if (entry) {
246 entry->mod_desc.rmobject(entry->version.version);
247 for (auto &&st: *transactions) {
248 st.second.collection_move_rename(
249 coll_t(spg_t(pgid, st.first)),
250 ghobject_t(oid, ghobject_t::NO_GEN, st.first),
251 coll_t(spg_t(pgid, st.first)),
252 ghobject_t(oid, entry->version.version, st.first));
253 }
254 } else {
255 for (auto &&st: *transactions) {
256 st.second.remove(
257 coll_t(spg_t(pgid, st.first)),
258 ghobject_t(oid, ghobject_t::NO_GEN, st.first));
259 }
260 }
261 hinfo->clear();
262 }
263
264 if (op.is_fresh_object() && entry) {
265 entry->mod_desc.create();
266 }
267
268 match(
269 op.init_type,
270 [&](const PGTransaction::ObjectOperation::Init::None &) {},
271 [&](const PGTransaction::ObjectOperation::Init::Create &op) {
272 for (auto &&st: *transactions) {
273 st.second.touch(
274 coll_t(spg_t(pgid, st.first)),
275 ghobject_t(oid, ghobject_t::NO_GEN, st.first));
276 }
277 },
278 [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
279 for (auto &&st: *transactions) {
280 st.second.clone(
281 coll_t(spg_t(pgid, st.first)),
282 ghobject_t(op.source, ghobject_t::NO_GEN, st.first),
283 ghobject_t(oid, ghobject_t::NO_GEN, st.first));
284 }
285
286 auto siter = hash_infos.find(op.source);
287 assert(siter != hash_infos.end());
288 hinfo->update_to(*(siter->second));
289
290 if (obc) {
291 auto cobciter = obc_map.find(op.source);
292 assert(cobciter != obc_map.end());
293 obc->attr_cache = cobciter->second->attr_cache;
294 }
295 },
296 [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
297 assert(op.source.is_temp());
298 for (auto &&st: *transactions) {
299 st.second.collection_move_rename(
300 coll_t(spg_t(pgid, st.first)),
301 ghobject_t(op.source, ghobject_t::NO_GEN, st.first),
302 coll_t(spg_t(pgid, st.first)),
303 ghobject_t(oid, ghobject_t::NO_GEN, st.first));
304 }
305 auto siter = hash_infos.find(op.source);
306 assert(siter != hash_infos.end());
307 hinfo->update_to(*(siter->second));
308 if (obc) {
309 auto cobciter = obc_map.find(op.source);
310 assert(cobciter == obc_map.end());
311 obc->attr_cache.clear();
312 }
313 });
314
315 // omap not supported (except 0, handled above)
316 assert(!(op.clear_omap));
317 assert(!(op.omap_header));
318 assert(op.omap_updates.empty());
319
320 if (!op.attr_updates.empty()) {
321 map<string, bufferlist> to_set;
322 for (auto &&j: op.attr_updates) {
323 if (j.second) {
324 to_set[j.first] = *(j.second);
325 } else {
326 for (auto &&st : *transactions) {
327 st.second.rmattr(
328 coll_t(spg_t(pgid, st.first)),
329 ghobject_t(oid, ghobject_t::NO_GEN, st.first),
330 j.first);
331 }
332 }
333 if (obc) {
334 auto citer = obc->attr_cache.find(j.first);
335 if (entry) {
336 if (citer != obc->attr_cache.end()) {
337 // won't overwrite anything we put in earlier
338 xattr_rollback.insert(
339 make_pair(
340 j.first,
341 boost::optional<bufferlist>(citer->second)));
342 } else {
343 // won't overwrite anything we put in earlier
344 xattr_rollback.insert(
345 make_pair(
346 j.first,
347 boost::none));
348 }
349 }
350 if (j.second) {
351 obc->attr_cache[j.first] = *(j.second);
352 } else if (citer != obc->attr_cache.end()) {
353 obc->attr_cache.erase(citer);
354 }
355 } else {
356 assert(!entry);
357 }
358 }
359 for (auto &&st : *transactions) {
360 st.second.setattrs(
361 coll_t(spg_t(pgid, st.first)),
362 ghobject_t(oid, ghobject_t::NO_GEN, st.first),
363 to_set);
364 }
365 assert(!xattr_rollback.empty());
366 }
367 if (entry && !xattr_rollback.empty()) {
368 entry->mod_desc.setattrs(xattr_rollback);
369 }
370
371 if (op.alloc_hint) {
372 /* logical_to_next_chunk_offset() scales down both aligned and
373 * unaligned offsets
374
375 * we don't bother to roll this back at this time for two reasons:
376 * 1) it's advisory
377 * 2) we don't track the old value */
378 uint64_t object_size = sinfo.logical_to_next_chunk_offset(
379 op.alloc_hint->expected_object_size);
380 uint64_t write_size = sinfo.logical_to_next_chunk_offset(
381 op.alloc_hint->expected_write_size);
382
383 for (auto &&st : *transactions) {
384 st.second.set_alloc_hint(
385 coll_t(spg_t(pgid, st.first)),
386 ghobject_t(oid, ghobject_t::NO_GEN, st.first),
387 object_size,
388 write_size,
389 op.alloc_hint->flags);
390 }
391 }
392
393 extent_map to_write;
394 auto pextiter = partial_extents.find(oid);
395 if (pextiter != partial_extents.end()) {
396 to_write = pextiter->second;
397 }
398
399 vector<pair<uint64_t, uint64_t> > rollback_extents;
400 const uint64_t orig_size = hinfo->get_total_logical_size(sinfo);
401
402 uint64_t new_size = orig_size;
403 uint64_t append_after = new_size;
404 ldpp_dout(dpp, 20) << __func__ << ": new_size start " << new_size << dendl;
405 if (op.truncate && op.truncate->first < new_size) {
406 assert(!op.is_fresh_object());
407 new_size = sinfo.logical_to_next_stripe_offset(
408 op.truncate->first);
409 ldpp_dout(dpp, 20) << __func__ << ": new_size truncate down "
410 << new_size << dendl;
411 if (new_size != op.truncate->first) { // 0 the unaligned part
412 bufferlist bl;
413 bl.append_zero(new_size - op.truncate->first);
414 to_write.insert(
415 op.truncate->first,
416 bl.length(),
417 bl);
418 append_after = sinfo.logical_to_prev_stripe_offset(
419 op.truncate->first);
420 } else {
421 append_after = new_size;
422 }
423 to_write.erase(
424 new_size,
425 std::numeric_limits<uint64_t>::max() - new_size);
426
427 if (entry && !op.is_fresh_object()) {
428 uint64_t restore_from = sinfo.logical_to_prev_chunk_offset(
429 op.truncate->first);
430 uint64_t restore_len = sinfo.aligned_logical_offset_to_chunk_offset(
431 orig_size -
432 sinfo.logical_to_prev_stripe_offset(op.truncate->first));
433 assert(rollback_extents.empty());
434
435 ldpp_dout(dpp, 20) << __func__ << ": saving extent "
436 << make_pair(restore_from, restore_len)
437 << dendl;
438 ldpp_dout(dpp, 20) << __func__ << ": truncating to "
439 << new_size
440 << dendl;
441 rollback_extents.emplace_back(
442 make_pair(restore_from, restore_len));
443 for (auto &&st : *transactions) {
444 st.second.touch(
445 coll_t(spg_t(pgid, st.first)),
446 ghobject_t(oid, entry->version.version, st.first));
447 st.second.clone_range(
448 coll_t(spg_t(pgid, st.first)),
449 ghobject_t(oid, ghobject_t::NO_GEN, st.first),
450 ghobject_t(oid, entry->version.version, st.first),
451 restore_from,
452 restore_len,
453 restore_from);
454
455 }
456 } else {
457 ldpp_dout(dpp, 20) << __func__ << ": not saving extents, fresh object"
458 << dendl;
459 }
460 for (auto &&st : *transactions) {
461 st.second.truncate(
462 coll_t(spg_t(pgid, st.first)),
463 ghobject_t(oid, ghobject_t::NO_GEN, st.first),
464 sinfo.aligned_logical_offset_to_chunk_offset(new_size));
465 }
466 }
467
468 uint32_t fadvise_flags = 0;
469 for (auto &&extent: op.buffer_updates) {
470 using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
471 bufferlist bl;
472 match(
473 extent.get_val(),
474 [&](const BufferUpdate::Write &op) {
475 bl = op.buffer;
476 fadvise_flags |= op.fadvise_flags;
477 },
478 [&](const BufferUpdate::Zero &) {
479 bl.append_zero(extent.get_len());
480 },
481 [&](const BufferUpdate::CloneRange &) {
482 assert(
483 0 ==
484 "CloneRange is not allowed, do_op should have returned ENOTSUPP");
485 });
486
487 uint64_t off = extent.get_off();
488 uint64_t len = extent.get_len();
489 uint64_t end = off + len;
490 ldpp_dout(dpp, 20) << __func__ << ": adding buffer_update "
491 << make_pair(off, len)
492 << dendl;
493 assert(len > 0);
494 if (off > new_size) {
495 assert(off > append_after);
496 bl.prepend_zero(off - new_size);
497 len += off - new_size;
498 ldpp_dout(dpp, 20) << __func__ << ": prepending zeroes to align "
499 << off << "->" << new_size
500 << dendl;
501 off = new_size;
502 }
503 if (!sinfo.logical_offset_is_stripe_aligned(end) && (end > append_after)) {
504 uint64_t aligned_end = sinfo.logical_to_next_stripe_offset(
505 end);
506 uint64_t tail = aligned_end - end;
507 bl.append_zero(tail);
508 ldpp_dout(dpp, 20) << __func__ << ": appending zeroes to align end "
509 << end << "->" << end+tail
510 << ", len: " << len << "->" << len+tail
511 << dendl;
512 end += tail;
513 len += tail;
514 }
515
516 to_write.insert(off, len, bl);
517 if (end > new_size)
518 new_size = end;
519 }
520
521 if (op.truncate &&
522 op.truncate->second > new_size) {
523 assert(op.truncate->second > append_after);
524 uint64_t truncate_to =
525 sinfo.logical_to_next_stripe_offset(
526 op.truncate->second);
527 uint64_t zeroes = truncate_to - new_size;
528 bufferlist bl;
529 bl.append_zero(zeroes);
530 to_write.insert(
531 new_size,
532 zeroes,
533 bl);
534 new_size = truncate_to;
535 ldpp_dout(dpp, 20) << __func__ << ": truncating out to "
536 << truncate_to
537 << dendl;
538 }
539
540 set<int> want;
541 for (unsigned i = 0; i < ecimpl->get_chunk_count(); ++i) {
542 want.insert(i);
543 }
544 auto to_overwrite = to_write.intersect(0, append_after);
545 ldpp_dout(dpp, 20) << __func__ << ": to_overwrite: "
546 << to_overwrite
547 << dendl;
548 for (auto &&extent: to_overwrite) {
549 assert(extent.get_off() + extent.get_len() <= append_after);
550 assert(sinfo.logical_offset_is_stripe_aligned(extent.get_off()));
551 assert(sinfo.logical_offset_is_stripe_aligned(extent.get_len()));
552 if (entry) {
553 uint64_t restore_from = sinfo.aligned_logical_offset_to_chunk_offset(
554 extent.get_off());
555 uint64_t restore_len = sinfo.aligned_logical_offset_to_chunk_offset(
556 extent.get_len());
557 ldpp_dout(dpp, 20) << __func__ << ": overwriting "
558 << restore_from << "~" << restore_len
559 << dendl;
560 if (rollback_extents.empty()) {
561 for (auto &&st : *transactions) {
562 st.second.touch(
563 coll_t(spg_t(pgid, st.first)),
564 ghobject_t(oid, entry->version.version, st.first));
565 }
566 }
567 rollback_extents.emplace_back(make_pair(restore_from, restore_len));
568 for (auto &&st : *transactions) {
569 st.second.clone_range(
570 coll_t(spg_t(pgid, st.first)),
571 ghobject_t(oid, ghobject_t::NO_GEN, st.first),
572 ghobject_t(oid, entry->version.version, st.first),
573 restore_from,
574 restore_len,
575 restore_from);
576 }
577 }
578 encode_and_write(
579 pgid,
580 oid,
581 sinfo,
582 ecimpl,
583 want,
584 extent.get_off(),
585 extent.get_val(),
586 fadvise_flags,
587 hinfo,
588 written,
589 transactions,
590 dpp);
591 }
592
593 auto to_append = to_write.intersect(
594 append_after,
595 std::numeric_limits<uint64_t>::max() - append_after);
596 ldpp_dout(dpp, 20) << __func__ << ": to_append: "
597 << to_append
598 << dendl;
599 for (auto &&extent: to_append) {
600 assert(sinfo.logical_offset_is_stripe_aligned(extent.get_off()));
601 assert(sinfo.logical_offset_is_stripe_aligned(extent.get_len()));
602 ldpp_dout(dpp, 20) << __func__ << ": appending "
603 << extent.get_off() << "~" << extent.get_len()
604 << dendl;
605 encode_and_write(
606 pgid,
607 oid,
608 sinfo,
609 ecimpl,
610 want,
611 extent.get_off(),
612 extent.get_val(),
613 fadvise_flags,
614 hinfo,
615 written,
616 transactions,
617 dpp);
618 }
619
620 ldpp_dout(dpp, 20) << __func__ << ": " << oid
621 << " resetting hinfo to logical size "
622 << new_size
623 << dendl;
624 if (!rollback_extents.empty() && entry) {
625 if (entry) {
626 ldpp_dout(dpp, 20) << __func__ << ": " << oid
627 << " marking rollback extents "
628 << rollback_extents
629 << dendl;
630 entry->mod_desc.rollback_extents(
631 entry->version.version, rollback_extents);
632 }
633 hinfo->set_total_chunk_size_clear_hash(
634 sinfo.aligned_logical_offset_to_chunk_offset(new_size));
635 } else {
636 assert(hinfo->get_total_logical_size(sinfo) == new_size);
637 }
638
639 if (entry && !to_append.empty()) {
640 ldpp_dout(dpp, 20) << __func__ << ": marking append "
641 << append_after
642 << dendl;
643 entry->mod_desc.append(append_after);
644 }
645
646 if (!op.is_delete()) {
647 bufferlist hbuf;
648 ::encode(*hinfo, hbuf);
649 for (auto &&i : *transactions) {
650 i.second.setattr(
651 coll_t(spg_t(pgid, i.first)),
652 ghobject_t(oid, ghobject_t::NO_GEN, i.first),
653 ECUtil::get_hinfo_key(),
654 hbuf);
655 }
656 }
657 });
658}