]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/PrimaryLogPG.h
update sources to 12.2.7
[ceph.git] / ceph / src / osd / PrimaryLogPG.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 /*
3 * Ceph - scalable distributed file system
4 *
5 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
6 * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
7 *
8 * Author: Loic Dachary <loic@dachary.org>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifndef CEPH_REPLICATEDPG_H
18 #define CEPH_REPLICATEDPG_H
19
20 #include <boost/tuple/tuple.hpp>
21 #include "include/assert.h"
22 #include "PG.h"
23 #include "Watch.h"
24 #include "TierAgentState.h"
25 #include "messages/MOSDOpReply.h"
26 #include "common/Checksummer.h"
27 #include "common/sharedptr_registry.hpp"
28 #include "ReplicatedBackend.h"
29 #include "PGTransaction.h"
30
31 class CopyFromCallback;
32 class PromoteCallback;
33
34 class PrimaryLogPG;
35 class PGLSFilter;
36 class HitSet;
37 struct TierAgentState;
38 class MOSDOp;
39 class MOSDOpReply;
40 class OSDService;
41
42 void intrusive_ptr_add_ref(PrimaryLogPG *pg);
43 void intrusive_ptr_release(PrimaryLogPG *pg);
44 uint64_t get_with_id(PrimaryLogPG *pg);
45 void put_with_id(PrimaryLogPG *pg, uint64_t id);
46
47 #ifdef PG_DEBUG_REFS
48 typedef TrackedIntPtr<PrimaryLogPG> PrimaryLogPGRef;
49 #else
50 typedef boost::intrusive_ptr<PrimaryLogPG> PrimaryLogPGRef;
51 #endif
52
53 struct inconsistent_snapset_wrapper;
54
55 class PrimaryLogPG : public PG, public PGBackend::Listener {
56 friend class OSD;
57 friend class Watch;
58
59 public:
60 MEMPOOL_CLASS_HELPERS();
61
62 /*
63 * state associated with a copy operation
64 */
65 struct OpContext;
66 class CopyCallback;
67
68 /**
69 * CopyResults stores the object metadata of interest to a copy initiator.
70 */
71 struct CopyResults {
72 ceph::real_time mtime; ///< the copy source's mtime
73 uint64_t object_size; ///< the copied object's size
74 bool started_temp_obj; ///< true if the callback needs to delete temp object
75 hobject_t temp_oid; ///< temp object (if any)
76
77 /**
78 * Function to fill in transaction; if non-empty the callback
79 * must execute it before any other accesses to the object
80 * (in order to complete the copy).
81 */
82 std::function<void(PGTransaction *)> fill_in_final_tx;
83
84 version_t user_version; ///< The copy source's user version
85 bool should_requeue; ///< op should be requeued on cancel
86 vector<snapid_t> snaps; ///< src's snaps (if clone)
87 snapid_t snap_seq; ///< src's snap_seq (if head)
88 librados::snap_set_t snapset; ///< src snapset (if head)
89 bool mirror_snapset;
90 bool has_omap;
91 uint32_t flags; // object_copy_data_t::FLAG_*
92 uint32_t source_data_digest, source_omap_digest;
93 uint32_t data_digest, omap_digest;
94 mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > reqids; // [(reqid, user_version)]
95 map<string, bufferlist> attrs; // xattrs
96 uint64_t truncate_seq;
97 uint64_t truncate_size;
98 bool is_data_digest() {
99 return flags & object_copy_data_t::FLAG_DATA_DIGEST;
100 }
101 bool is_omap_digest() {
102 return flags & object_copy_data_t::FLAG_OMAP_DIGEST;
103 }
104 CopyResults()
105 : object_size(0), started_temp_obj(false),
106 user_version(0),
107 should_requeue(false), mirror_snapset(false),
108 has_omap(false),
109 flags(0),
110 source_data_digest(-1), source_omap_digest(-1),
111 data_digest(-1), omap_digest(-1),
112 truncate_seq(0), truncate_size(0)
113 {}
114 };
115
116 struct CopyOp {
117 CopyCallback *cb;
118 ObjectContextRef obc;
119 hobject_t src;
120 object_locator_t oloc;
121 unsigned flags;
122 bool mirror_snapset;
123
124 CopyResults results;
125
126 ceph_tid_t objecter_tid;
127 ceph_tid_t objecter_tid2;
128
129 object_copy_cursor_t cursor;
130 map<string,bufferlist> attrs;
131 bufferlist data;
132 bufferlist omap_header;
133 bufferlist omap_data;
134 int rval;
135
136 object_copy_cursor_t temp_cursor;
137
138 /*
139 * For CopyOp the process is:
140 * step1: read the data(attr/omap/data) from the source object
141 * step2: handle those data(w/ those data create a new object)
142 * src_obj_fadvise_flags used in step1;
143 * dest_obj_fadvise_flags used in step2
144 */
145 unsigned src_obj_fadvise_flags;
146 unsigned dest_obj_fadvise_flags;
147
148 CopyOp(CopyCallback *cb_, ObjectContextRef _obc, hobject_t s,
149 object_locator_t l,
150 version_t v,
151 unsigned f,
152 bool ms,
153 unsigned src_obj_fadvise_flags,
154 unsigned dest_obj_fadvise_flags)
155 : cb(cb_), obc(_obc), src(s), oloc(l), flags(f),
156 mirror_snapset(ms),
157 objecter_tid(0),
158 objecter_tid2(0),
159 rval(-1),
160 src_obj_fadvise_flags(src_obj_fadvise_flags),
161 dest_obj_fadvise_flags(dest_obj_fadvise_flags)
162 {
163 results.user_version = v;
164 results.mirror_snapset = mirror_snapset;
165 }
166 };
167 typedef ceph::shared_ptr<CopyOp> CopyOpRef;
168
169 /**
170 * The CopyCallback class defines an interface for completions to the
171 * copy_start code. Users of the copy infrastructure must implement
172 * one and give an instance of the class to start_copy.
173 *
174 * The implementer is responsible for making sure that the CopyCallback
175 * can associate itself with the correct copy operation.
176 */
177 typedef boost::tuple<int, CopyResults*> CopyCallbackResults;
178
179 friend class CopyFromCallback;
180 friend class CopyFromFinisher;
181 friend class PromoteCallback;
182
183 struct ProxyReadOp {
184 OpRequestRef op;
185 hobject_t soid;
186 ceph_tid_t objecter_tid;
187 vector<OSDOp> &ops;
188 version_t user_version;
189 int data_offset;
190 bool canceled; ///< true if canceled
191
192 ProxyReadOp(OpRequestRef _op, hobject_t oid, vector<OSDOp>& _ops)
193 : op(_op), soid(oid),
194 objecter_tid(0), ops(_ops),
195 user_version(0), data_offset(0),
196 canceled(false) { }
197 };
198 typedef ceph::shared_ptr<ProxyReadOp> ProxyReadOpRef;
199
200 struct ProxyWriteOp {
201 OpContext *ctx;
202 OpRequestRef op;
203 hobject_t soid;
204 ceph_tid_t objecter_tid;
205 vector<OSDOp> &ops;
206 version_t user_version;
207 bool sent_reply;
208 utime_t mtime;
209 bool canceled;
210 osd_reqid_t reqid;
211
212 ProxyWriteOp(OpRequestRef _op, hobject_t oid, vector<OSDOp>& _ops, osd_reqid_t _reqid)
213 : ctx(NULL), op(_op), soid(oid),
214 objecter_tid(0), ops(_ops),
215 user_version(0), sent_reply(false),
216 canceled(false),
217 reqid(_reqid) { }
218 };
219 typedef ceph::shared_ptr<ProxyWriteOp> ProxyWriteOpRef;
220
221 struct FlushOp {
222 ObjectContextRef obc; ///< obc we are flushing
223 OpRequestRef op; ///< initiating op
224 list<OpRequestRef> dup_ops; ///< bandwagon jumpers
225 version_t flushed_version; ///< user version we are flushing
226 ceph_tid_t objecter_tid; ///< copy-from request tid
227 int rval; ///< copy-from result
228 bool blocking; ///< whether we are blocking updates
229 bool removal; ///< we are removing the backend object
230 boost::optional<std::function<void()>> on_flush; ///< callback, may be null
231 // for chunked object
232 map<uint64_t, ceph_tid_t> io_tids;
233
234 FlushOp()
235 : flushed_version(0), objecter_tid(0), rval(0),
236 blocking(false), removal(false) {}
237 ~FlushOp() { assert(!on_flush); }
238 };
239 typedef ceph::shared_ptr<FlushOp> FlushOpRef;
240
241 boost::scoped_ptr<PGBackend> pgbackend;
242 PGBackend *get_pgbackend() override {
243 return pgbackend.get();
244 }
245
246 /// Listener methods
247 DoutPrefixProvider *get_dpp() override {
248 return this;
249 }
250
251 void on_local_recover(
252 const hobject_t &oid,
253 const ObjectRecoveryInfo &recovery_info,
254 ObjectContextRef obc,
255 bool is_delete,
256 ObjectStore::Transaction *t
257 ) override;
258 void on_peer_recover(
259 pg_shard_t peer,
260 const hobject_t &oid,
261 const ObjectRecoveryInfo &recovery_info
262 ) override;
263 void begin_peer_recover(
264 pg_shard_t peer,
265 const hobject_t oid) override;
266 void on_global_recover(
267 const hobject_t &oid,
268 const object_stat_sum_t &stat_diff,
269 bool is_delete) override;
270 void failed_push(const list<pg_shard_t> &from, const hobject_t &soid) override;
271 void primary_failed(const hobject_t &soid) override;
272 bool primary_error(const hobject_t& soid, eversion_t v) override;
273 void cancel_pull(const hobject_t &soid) override;
274 void apply_stats(
275 const hobject_t &soid,
276 const object_stat_sum_t &delta_stats) override;
277 void on_primary_error(const hobject_t &oid, eversion_t v) override;
278 void backfill_add_missing(const hobject_t &oid, eversion_t v) override;
279 void remove_missing_object(const hobject_t &oid,
280 eversion_t v,
281 Context *on_complete) override;
282
283 template<class T> class BlessedGenContext;
284 class BlessedContext;
285 Context *bless_context(Context *c) override;
286
287 GenContext<ThreadPool::TPHandle&> *bless_gencontext(
288 GenContext<ThreadPool::TPHandle&> *c) override;
289
290 void send_message(int to_osd, Message *m) override {
291 osd->send_message_osd_cluster(to_osd, m, get_osdmap()->get_epoch());
292 }
293 void queue_transaction(ObjectStore::Transaction&& t,
294 OpRequestRef op) override {
295 osd->store->queue_transaction(osr.get(), std::move(t), 0, 0, 0, op);
296 }
297 void queue_transactions(vector<ObjectStore::Transaction>& tls,
298 OpRequestRef op) override {
299 osd->store->queue_transactions(osr.get(), tls, 0, 0, 0, op, NULL);
300 }
301 epoch_t get_epoch() const override {
302 return get_osdmap()->get_epoch();
303 }
304 epoch_t get_interval_start_epoch() const override {
305 return info.history.same_interval_since;
306 }
307 epoch_t get_last_peering_reset_epoch() const override {
308 return get_last_peering_reset();
309 }
310 const set<pg_shard_t> &get_actingbackfill_shards() const override {
311 return actingbackfill;
312 }
313 const set<pg_shard_t> &get_acting_shards() const override {
314 return actingset;
315 }
316 const set<pg_shard_t> &get_backfill_shards() const override {
317 return backfill_targets;
318 }
319
320 std::string gen_dbg_prefix() const override { return gen_prefix(); }
321
322 const map<hobject_t, set<pg_shard_t>>
323 &get_missing_loc_shards() const override {
324 return missing_loc.get_missing_locs();
325 }
326 const map<pg_shard_t, pg_missing_t> &get_shard_missing() const override {
327 return peer_missing;
328 }
329 using PGBackend::Listener::get_shard_missing;
330 const map<pg_shard_t, pg_info_t> &get_shard_info() const override {
331 return peer_info;
332 }
333 using PGBackend::Listener::get_shard_info;
334 const pg_missing_tracker_t &get_local_missing() const override {
335 return pg_log.get_missing();
336 }
337 const PGLog &get_log() const override {
338 return pg_log;
339 }
340 bool pgb_is_primary() const override {
341 return is_primary();
342 }
343 OSDMapRef pgb_get_osdmap() const override {
344 return get_osdmap();
345 }
346 const pg_info_t &get_info() const override {
347 return info;
348 }
349 const pg_pool_t &get_pool() const override {
350 return pool.info;
351 }
352
353 ObjectContextRef get_obc(
354 const hobject_t &hoid,
355 const map<string, bufferlist> &attrs) override {
356 return get_object_context(hoid, true, &attrs);
357 }
358
359 bool try_lock_for_read(
360 const hobject_t &hoid,
361 ObcLockManager &manager) override {
362 if (is_missing_object(hoid))
363 return false;
364 auto obc = get_object_context(hoid, false, nullptr);
365 if (!obc)
366 return false;
367 return manager.try_get_read_lock(hoid, obc);
368 }
369
370 void release_locks(ObcLockManager &manager) override {
371 release_object_locks(manager);
372 }
373
374 void pgb_set_object_snap_mapping(
375 const hobject_t &soid,
376 const set<snapid_t> &snaps,
377 ObjectStore::Transaction *t) override {
378 return update_object_snap_mapping(t, soid, snaps);
379 }
380 void pgb_clear_object_snap_mapping(
381 const hobject_t &soid,
382 ObjectStore::Transaction *t) override {
383 return clear_object_snap_mapping(t, soid);
384 }
385
386 void log_operation(
387 const vector<pg_log_entry_t> &logv,
388 const boost::optional<pg_hit_set_history_t> &hset_history,
389 const eversion_t &trim_to,
390 const eversion_t &roll_forward_to,
391 bool transaction_applied,
392 ObjectStore::Transaction &t) override {
393 if (hset_history) {
394 info.hit_set = *hset_history;
395 }
396 append_log(logv, trim_to, roll_forward_to, t, transaction_applied);
397 }
398
399 struct C_OSD_OnApplied;
400 void op_applied(
401 const eversion_t &applied_version) override;
402
403 bool should_send_op(
404 pg_shard_t peer,
405 const hobject_t &hoid) override {
406 if (peer == get_primary())
407 return true;
408 assert(peer_info.count(peer));
409 bool should_send =
410 hoid.pool != (int64_t)info.pgid.pool() ||
411 hoid <= last_backfill_started ||
412 hoid <= peer_info[peer].last_backfill;
413 if (!should_send)
414 assert(is_backfill_targets(peer));
415 return should_send;
416 }
417
418 void update_peer_last_complete_ondisk(
419 pg_shard_t fromosd,
420 eversion_t lcod) override {
421 peer_last_complete_ondisk[fromosd] = lcod;
422 }
423
424 void update_last_complete_ondisk(
425 eversion_t lcod) override {
426 last_complete_ondisk = lcod;
427 }
428
429 void update_stats(
430 const pg_stat_t &stat) override {
431 info.stats = stat;
432 }
433
434 void schedule_recovery_work(
435 GenContext<ThreadPool::TPHandle&> *c) override;
436
437 pg_shard_t whoami_shard() const override {
438 return pg_whoami;
439 }
440 spg_t primary_spg_t() const override {
441 return spg_t(info.pgid.pgid, primary.shard);
442 }
443 pg_shard_t primary_shard() const override {
444 return primary;
445 }
446 uint64_t min_peer_features() const override {
447 return get_min_peer_features();
448 }
449
450 void send_message_osd_cluster(
451 int peer, Message *m, epoch_t from_epoch) override;
452 void send_message_osd_cluster(
453 Message *m, Connection *con) override;
454 void send_message_osd_cluster(
455 Message *m, const ConnectionRef& con) override;
456 ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch) override;
457 entity_name_t get_cluster_msgr_name() override {
458 return osd->get_cluster_msgr_name();
459 }
460
461 PerfCounters *get_logger() override;
462
463 ceph_tid_t get_tid() override { return osd->get_tid(); }
464
465 LogClientTemp clog_error() override { return osd->clog->error(); }
466 LogClientTemp clog_warn() override { return osd->clog->warn(); }
467
468 struct watch_disconnect_t {
469 uint64_t cookie;
470 entity_name_t name;
471 bool send_disconnect;
472 watch_disconnect_t(uint64_t c, entity_name_t n, bool sd)
473 : cookie(c), name(n), send_disconnect(sd) {}
474 };
475 void complete_disconnect_watches(
476 ObjectContextRef obc,
477 const list<watch_disconnect_t> &to_disconnect);
478
479 struct OpFinisher {
480 virtual ~OpFinisher() {
481 }
482
483 virtual int execute() = 0;
484 };
485
486 /*
487 * Capture all object state associated with an in-progress read or write.
488 */
489 struct OpContext {
490 OpRequestRef op;
491 osd_reqid_t reqid;
492 vector<OSDOp> *ops;
493
494 const ObjectState *obs; // Old objectstate
495 const SnapSet *snapset; // Old snapset
496
497 ObjectState new_obs; // resulting ObjectState
498 SnapSet new_snapset; // resulting SnapSet (in case of a write)
499 //pg_stat_t new_stats; // resulting Stats
500 object_stat_sum_t delta_stats;
501
502 bool modify; // (force) modification (even if op_t is empty)
503 bool user_modify; // user-visible modification
504 bool undirty; // user explicitly un-dirtying this object
505 bool cache_evict; ///< true if this is a cache eviction
506 bool ignore_cache; ///< true if IGNORE_CACHE flag is set
507 bool ignore_log_op_stats; // don't log op stats
508 bool update_log_only; ///< this is a write that returned an error - just record in pg log for dup detection
509
510 // side effects
511 list<pair<watch_info_t,bool> > watch_connects; ///< new watch + will_ping flag
512 list<watch_disconnect_t> watch_disconnects; ///< old watch + send_discon
513 list<notify_info_t> notifies;
514 struct NotifyAck {
515 boost::optional<uint64_t> watch_cookie;
516 uint64_t notify_id;
517 bufferlist reply_bl;
518 explicit NotifyAck(uint64_t notify_id) : notify_id(notify_id) {}
519 NotifyAck(uint64_t notify_id, uint64_t cookie, bufferlist& rbl)
520 : watch_cookie(cookie), notify_id(notify_id) {
521 reply_bl.claim(rbl);
522 }
523 };
524 list<NotifyAck> notify_acks;
525
526 uint64_t bytes_written, bytes_read;
527
528 utime_t mtime;
529 SnapContext snapc; // writer snap context
530 eversion_t at_version; // pg's current version pointer
531 version_t user_at_version; // pg's current user version pointer
532
533 /// index of the current subop - only valid inside of do_osd_ops()
534 int current_osd_subop_num;
535 /// total number of subops processed in this context for cls_cxx_subop_version()
536 int processed_subop_count = 0;
537
538 PGTransactionUPtr op_t;
539 vector<pg_log_entry_t> log;
540 boost::optional<pg_hit_set_history_t> updated_hset_history;
541
542 interval_set<uint64_t> modified_ranges;
543 ObjectContextRef obc;
544 ObjectContextRef clone_obc; // if we created a clone
545 ObjectContextRef snapset_obc; // if we created/deleted a snapdir
546
547 // FIXME: we may want to kill this msgr hint off at some point!
548 boost::optional<int> data_off = boost::none;
549
550 MOSDOpReply *reply;
551
552 PrimaryLogPG *pg;
553
554 int num_read; ///< count read ops
555 int num_write; ///< count update ops
556
557 mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > extra_reqids;
558
559 hobject_t new_temp_oid, discard_temp_oid; ///< temp objects we should start/stop tracking
560
561 list<std::function<void()>> on_applied;
562 list<std::function<void()>> on_committed;
563 list<std::function<void()>> on_finish;
564 list<std::function<void()>> on_success;
565 template <typename F>
566 void register_on_finish(F &&f) {
567 on_finish.emplace_back(std::forward<F>(f));
568 }
569 template <typename F>
570 void register_on_success(F &&f) {
571 on_success.emplace_back(std::forward<F>(f));
572 }
573 template <typename F>
574 void register_on_applied(F &&f) {
575 on_applied.emplace_back(std::forward<F>(f));
576 }
577 template <typename F>
578 void register_on_commit(F &&f) {
579 on_committed.emplace_back(std::forward<F>(f));
580 }
581
582 bool sent_reply;
583
584 // pending async reads <off, len, op_flags> -> <outbl, outr>
585 list<pair<boost::tuple<uint64_t, uint64_t, unsigned>,
586 pair<bufferlist*, Context*> > > pending_async_reads;
587 int inflightreads;
588 friend struct OnReadComplete;
589 void start_async_reads(PrimaryLogPG *pg);
590 void finish_read(PrimaryLogPG *pg);
591 bool async_reads_complete() {
592 return inflightreads == 0;
593 }
594
595 ObjectContext::RWState::State lock_type;
596 ObcLockManager lock_manager;
597
598 std::map<int, std::unique_ptr<OpFinisher>> op_finishers;
599
600 OpContext(const OpContext& other);
601 const OpContext& operator=(const OpContext& other);
602
603 OpContext(OpRequestRef _op, osd_reqid_t _reqid, vector<OSDOp>* _ops,
604 ObjectContextRef& obc,
605 PrimaryLogPG *_pg) :
606 op(_op), reqid(_reqid), ops(_ops),
607 obs(&obc->obs),
608 snapset(0),
609 new_obs(obs->oi, obs->exists),
610 modify(false), user_modify(false), undirty(false), cache_evict(false),
611 ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
612 bytes_written(0), bytes_read(0), user_at_version(0),
613 current_osd_subop_num(0),
614 obc(obc),
615 reply(NULL), pg(_pg),
616 num_read(0),
617 num_write(0),
618 sent_reply(false),
619 inflightreads(0),
620 lock_type(ObjectContext::RWState::RWNONE) {
621 if (obc->ssc) {
622 new_snapset = obc->ssc->snapset;
623 snapset = &obc->ssc->snapset;
624 }
625 }
626 OpContext(OpRequestRef _op, osd_reqid_t _reqid,
627 vector<OSDOp>* _ops, PrimaryLogPG *_pg) :
628 op(_op), reqid(_reqid), ops(_ops), obs(NULL), snapset(0),
629 modify(false), user_modify(false), undirty(false), cache_evict(false),
630 ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
631 bytes_written(0), bytes_read(0), user_at_version(0),
632 current_osd_subop_num(0),
633 reply(NULL), pg(_pg),
634 num_read(0),
635 num_write(0),
636 inflightreads(0),
637 lock_type(ObjectContext::RWState::RWNONE) {}
638 void reset_obs(ObjectContextRef obc) {
639 new_obs = ObjectState(obc->obs.oi, obc->obs.exists);
640 if (obc->ssc) {
641 new_snapset = obc->ssc->snapset;
642 snapset = &obc->ssc->snapset;
643 }
644 }
645 ~OpContext() {
646 assert(!op_t);
647 if (reply)
648 reply->put();
649 for (list<pair<boost::tuple<uint64_t, uint64_t, unsigned>,
650 pair<bufferlist*, Context*> > >::iterator i =
651 pending_async_reads.begin();
652 i != pending_async_reads.end();
653 pending_async_reads.erase(i++)) {
654 delete i->second.second;
655 }
656 }
657 uint64_t get_features() {
658 if (op && op->get_req()) {
659 return op->get_req()->get_connection()->get_features();
660 }
661 return -1ull;
662 }
663 };
664 using OpContextUPtr = std::unique_ptr<OpContext>;
665 friend struct OpContext;
666
667 /*
668 * State on the PG primary associated with the replicated mutation
669 */
670 class RepGather {
671 public:
672 hobject_t hoid;
673 OpRequestRef op;
674 xlist<RepGather*>::item queue_item;
675 int nref;
676
677 eversion_t v;
678 int r = 0;
679
680 ceph_tid_t rep_tid;
681
682 bool rep_aborted, rep_done;
683
684 bool all_applied;
685 bool all_committed;
686 const bool applies_with_commit;
687
688 utime_t start;
689
690 eversion_t pg_local_last_complete;
691
692 ObcLockManager lock_manager;
693
694 list<std::function<void()>> on_applied;
695 list<std::function<void()>> on_committed;
696 list<std::function<void()>> on_success;
697 list<std::function<void()>> on_finish;
698
699 RepGather(
700 OpContext *c, ceph_tid_t rt,
701 eversion_t lc,
702 bool applies_with_commit) :
703 hoid(c->obc->obs.oi.soid),
704 op(c->op),
705 queue_item(this),
706 nref(1),
707 rep_tid(rt),
708 rep_aborted(false), rep_done(false),
709 all_applied(false), all_committed(false),
710 applies_with_commit(applies_with_commit),
711 pg_local_last_complete(lc),
712 lock_manager(std::move(c->lock_manager)),
713 on_applied(std::move(c->on_applied)),
714 on_committed(std::move(c->on_committed)),
715 on_success(std::move(c->on_success)),
716 on_finish(std::move(c->on_finish)) {}
717
718 RepGather(
719 ObcLockManager &&manager,
720 OpRequestRef &&o,
721 boost::optional<std::function<void(void)> > &&on_complete,
722 ceph_tid_t rt,
723 eversion_t lc,
724 bool applies_with_commit,
725 int r) :
726 op(o),
727 queue_item(this),
728 nref(1),
729 r(r),
730 rep_tid(rt),
731 rep_aborted(false), rep_done(false),
732 all_applied(false), all_committed(false),
733 applies_with_commit(applies_with_commit),
734 pg_local_last_complete(lc),
735 lock_manager(std::move(manager)) {
736 if (on_complete) {
737 on_success.push_back(std::move(*on_complete));
738 }
739 }
740
741 RepGather *get() {
742 nref++;
743 return this;
744 }
745 void put() {
746 assert(nref > 0);
747 if (--nref == 0) {
748 assert(on_applied.empty());
749 delete this;
750 //generic_dout(0) << "deleting " << this << dendl;
751 }
752 }
753 };
754
755
756 protected:
757
758 /**
759 * Grabs locks for OpContext, should be cleaned up in close_op_ctx
760 *
761 * @param ctx [in,out] ctx to get locks for
762 * @return true on success, false if we are queued
763 */
764 bool get_rw_locks(bool write_ordered, OpContext *ctx) {
765 /* If snapset_obc, !obc->obs->exists and we will always take the
766 * snapdir lock *before* the head lock. Since all callers will do
767 * this (read or write) if we get the first we will be guaranteed
768 * to get the second.
769 */
770 if (write_ordered && ctx->op->may_read()) {
771 ctx->lock_type = ObjectContext::RWState::RWEXCL;
772 } else if (write_ordered) {
773 ctx->lock_type = ObjectContext::RWState::RWWRITE;
774 } else {
775 assert(ctx->op->may_read());
776 ctx->lock_type = ObjectContext::RWState::RWREAD;
777 }
778
779 if (ctx->snapset_obc) {
780 assert(!ctx->obc->obs.exists);
781 if (!ctx->lock_manager.get_lock_type(
782 ctx->lock_type,
783 ctx->snapset_obc->obs.oi.soid,
784 ctx->snapset_obc,
785 ctx->op)) {
786 ctx->lock_type = ObjectContext::RWState::RWNONE;
787 return false;
788 }
789 }
790 if (ctx->lock_manager.get_lock_type(
791 ctx->lock_type,
792 ctx->obc->obs.oi.soid,
793 ctx->obc,
794 ctx->op)) {
795 return true;
796 } else {
797 assert(!ctx->snapset_obc);
798 ctx->lock_type = ObjectContext::RWState::RWNONE;
799 return false;
800 }
801 }
802
803 /**
804 * Cleans up OpContext
805 *
806 * @param ctx [in] ctx to clean up
807 */
808 void close_op_ctx(OpContext *ctx);
809
810 /**
811 * Releases locks
812 *
813 * @param manager [in] manager with locks to release
814 */
815 void release_object_locks(
816 ObcLockManager &lock_manager) {
817 list<pair<hobject_t, list<OpRequestRef> > > to_req;
818 bool requeue_recovery = false;
819 bool requeue_snaptrim = false;
820 lock_manager.put_locks(
821 &to_req,
822 &requeue_recovery,
823 &requeue_snaptrim);
824 if (requeue_recovery)
825 queue_recovery();
826 if (requeue_snaptrim)
827 snap_trimmer_machine.process_event(TrimWriteUnblocked());
828
829 if (!to_req.empty()) {
830 // requeue at front of scrub blocking queue if we are blocked by scrub
831 for (auto &&p: to_req) {
832 if (write_blocked_by_scrub(p.first.get_head())) {
833 for (auto& op : p.second) {
834 op->mark_delayed("waiting for scrub");
835 }
836 waiting_for_scrub.splice(
837 waiting_for_scrub.begin(),
838 p.second,
839 p.second.begin(),
840 p.second.end());
841 } else {
842 requeue_ops(p.second);
843 }
844 }
845 }
846 }
847
848 // replica ops
849 // [primary|tail]
850 xlist<RepGather*> repop_queue;
851
852 friend class C_OSD_RepopApplied;
853 friend class C_OSD_RepopCommit;
854 void repop_all_applied(RepGather *repop);
855 void repop_all_committed(RepGather *repop);
856 void eval_repop(RepGather*);
857 void issue_repop(RepGather *repop, OpContext *ctx);
858 RepGather *new_repop(
859 OpContext *ctx,
860 ObjectContextRef obc,
861 ceph_tid_t rep_tid);
862 boost::intrusive_ptr<RepGather> new_repop(
863 eversion_t version,
864 int r,
865 ObcLockManager &&manager,
866 OpRequestRef &&op,
867 boost::optional<std::function<void(void)> > &&on_complete);
868 void remove_repop(RepGather *repop);
869
870 OpContextUPtr simple_opc_create(ObjectContextRef obc);
871 void simple_opc_submit(OpContextUPtr ctx);
872
873 /**
874 * Merge entries atomically into all actingbackfill osds
875 * adjusting missing and recovery state as necessary.
876 *
877 * Also used to store error log entries for dup detection.
878 */
879 void submit_log_entries(
880 const mempool::osd_pglog::list<pg_log_entry_t> &entries,
881 ObcLockManager &&manager,
882 boost::optional<std::function<void(void)> > &&on_complete,
883 OpRequestRef op = OpRequestRef(),
884 int r = 0);
885 struct LogUpdateCtx {
886 boost::intrusive_ptr<RepGather> repop;
887 set<pg_shard_t> waiting_on;
888 };
889 void cancel_log_updates();
890 map<ceph_tid_t, LogUpdateCtx> log_entry_update_waiting_on;
891
892
893 // hot/cold tracking
894 HitSetRef hit_set; ///< currently accumulating HitSet
895 utime_t hit_set_start_stamp; ///< time the current HitSet started recording
896
897
898 void hit_set_clear(); ///< discard any HitSet state
899 void hit_set_setup(); ///< initialize HitSet state
900 void hit_set_create(); ///< create a new HitSet
901 void hit_set_persist(); ///< persist hit info
902 bool hit_set_apply_log(); ///< apply log entries to update in-memory HitSet
903 void hit_set_trim(OpContextUPtr &ctx, unsigned max); ///< discard old HitSets
904 void hit_set_in_memory_trim(uint32_t max_in_memory); ///< discard old in memory HitSets
905 void hit_set_remove_all();
906
907 hobject_t get_hit_set_current_object(utime_t stamp);
908 hobject_t get_hit_set_archive_object(utime_t start,
909 utime_t end,
910 bool using_gmt);
911
912 // agent
913 boost::scoped_ptr<TierAgentState> agent_state;
914
915 void agent_setup(); ///< initialize agent state
916 bool agent_work(int max) override ///< entry point to do some agent work
917 {
918 return agent_work(max, max);
919 }
920 bool agent_work(int max, int agent_flush_quota) override;
921 bool agent_maybe_flush(ObjectContextRef& obc); ///< maybe flush
922 bool agent_maybe_evict(ObjectContextRef& obc, bool after_flush); ///< maybe evict
923
924 void agent_load_hit_sets(); ///< load HitSets, if needed
925
926 /// estimate object atime and temperature
927 ///
928 /// @param oid [in] object name
929 /// @param temperature [out] relative temperature (# consider both access time and frequency)
930 void agent_estimate_temp(const hobject_t& oid, int *temperature);
931
932 /// stop the agent
933 void agent_stop() override;
934 void agent_delay() override;
935
936 /// clear agent state
937 void agent_clear() override;
938
939 /// choose (new) agent mode(s), returns true if op is requeued
940 bool agent_choose_mode(bool restart = false, OpRequestRef op = OpRequestRef());
941 void agent_choose_mode_restart() override;
942
943 /// true if we can send an ondisk/commit for v
944 bool already_complete(eversion_t v);
945 /// true if we can send an ack for v
946 bool already_ack(eversion_t v);
947
948 // projected object info
949 SharedLRU<hobject_t, ObjectContext> object_contexts;
950 // map from oid.snapdir() to SnapSetContext *
951 map<hobject_t, SnapSetContext*> snapset_contexts;
952 Mutex snapset_contexts_lock;
953
954 // debug order that client ops are applied
955 map<hobject_t, map<client_t, ceph_tid_t>> debug_op_order;
956
957 void populate_obc_watchers(ObjectContextRef obc);
958 void check_blacklisted_obc_watchers(ObjectContextRef obc);
959 void check_blacklisted_watchers() override;
960 void get_watchers(list<obj_watch_item_t> &pg_watchers) override;
961 void get_obc_watchers(ObjectContextRef obc, list<obj_watch_item_t> &pg_watchers);
962 public:
963 void handle_watch_timeout(WatchRef watch);
964 protected:
965
966 ObjectContextRef create_object_context(const object_info_t& oi, SnapSetContext *ssc);
967 ObjectContextRef get_object_context(
968 const hobject_t& soid,
969 bool can_create,
970 const map<string, bufferlist> *attrs = 0
971 );
972
973 void context_registry_on_change();
974 void object_context_destructor_callback(ObjectContext *obc);
975 class C_PG_ObjectContext;
976
977 int find_object_context(const hobject_t& oid,
978 ObjectContextRef *pobc,
979 bool can_create,
980 bool map_snapid_to_clone=false,
981 hobject_t *missing_oid=NULL);
982
983 void add_object_context_to_pg_stat(ObjectContextRef obc, pg_stat_t *stat);
984
985 void get_src_oloc(const object_t& oid, const object_locator_t& oloc, object_locator_t& src_oloc);
986
987 SnapSetContext *get_snapset_context(
988 const hobject_t& oid,
989 bool can_create,
990 const map<string, bufferlist> *attrs = 0,
991 bool oid_existed = true //indicate this oid whether exsited in backend
992 );
993 void register_snapset_context(SnapSetContext *ssc) {
994 Mutex::Locker l(snapset_contexts_lock);
995 _register_snapset_context(ssc);
996 }
997 void _register_snapset_context(SnapSetContext *ssc) {
998 assert(snapset_contexts_lock.is_locked());
999 if (!ssc->registered) {
1000 assert(snapset_contexts.count(ssc->oid) == 0);
1001 ssc->registered = true;
1002 snapset_contexts[ssc->oid] = ssc;
1003 }
1004 }
1005 void put_snapset_context(SnapSetContext *ssc);
1006
1007 map<hobject_t, ObjectContextRef> recovering;
1008
1009 /*
1010 * Backfill
1011 *
1012 * peer_info[backfill_target].last_backfill == info.last_backfill on the peer.
1013 *
1014 * objects prior to peer_info[backfill_target].last_backfill
1015 * - are on the peer
1016 * - are included in the peer stats
1017 *
1018 * objects \in (last_backfill, last_backfill_started]
1019 * - are on the peer or are in backfills_in_flight
1020 * - are not included in pg stats (yet)
1021 * - have their stats in pending_backfill_updates on the primary
1022 */
1023 set<hobject_t> backfills_in_flight;
1024 map<hobject_t, pg_stat_t> pending_backfill_updates;
1025
1026 void dump_recovery_info(Formatter *f) const override {
1027 f->open_array_section("backfill_targets");
1028 for (set<pg_shard_t>::const_iterator p = backfill_targets.begin();
1029 p != backfill_targets.end(); ++p)
1030 f->dump_stream("replica") << *p;
1031 f->close_section();
1032 f->open_array_section("waiting_on_backfill");
1033 for (set<pg_shard_t>::const_iterator p = waiting_on_backfill.begin();
1034 p != waiting_on_backfill.end(); ++p)
1035 f->dump_stream("osd") << *p;
1036 f->close_section();
1037 f->dump_stream("last_backfill_started") << last_backfill_started;
1038 {
1039 f->open_object_section("backfill_info");
1040 backfill_info.dump(f);
1041 f->close_section();
1042 }
1043 {
1044 f->open_array_section("peer_backfill_info");
1045 for (map<pg_shard_t, BackfillInterval>::const_iterator pbi =
1046 peer_backfill_info.begin();
1047 pbi != peer_backfill_info.end(); ++pbi) {
1048 f->dump_stream("osd") << pbi->first;
1049 f->open_object_section("BackfillInterval");
1050 pbi->second.dump(f);
1051 f->close_section();
1052 }
1053 f->close_section();
1054 }
1055 {
1056 f->open_array_section("backfills_in_flight");
1057 for (set<hobject_t>::const_iterator i = backfills_in_flight.begin();
1058 i != backfills_in_flight.end();
1059 ++i) {
1060 f->dump_stream("object") << *i;
1061 }
1062 f->close_section();
1063 }
1064 {
1065 f->open_array_section("recovering");
1066 for (map<hobject_t, ObjectContextRef>::const_iterator i = recovering.begin();
1067 i != recovering.end();
1068 ++i) {
1069 f->dump_stream("object") << i->first;
1070 }
1071 f->close_section();
1072 }
1073 {
1074 f->open_object_section("pg_backend");
1075 pgbackend->dump_recovery_info(f);
1076 f->close_section();
1077 }
1078 }
1079
1080 /// last backfill operation started
1081 hobject_t last_backfill_started;
1082 bool new_backfill;
1083
1084 int prep_object_replica_pushes(const hobject_t& soid, eversion_t v,
1085 PGBackend::RecoveryHandle *h);
1086 int prep_object_replica_deletes(const hobject_t& soid, eversion_t v,
1087 PGBackend::RecoveryHandle *h);
1088
1089 void finish_degraded_object(const hobject_t& oid);
1090
1091 // Cancels/resets pulls from peer
1092 void check_recovery_sources(const OSDMapRef& map) override ;
1093
1094 int recover_missing(
1095 const hobject_t& oid,
1096 eversion_t v,
1097 int priority,
1098 PGBackend::RecoveryHandle *h);
1099
1100 // low level ops
1101
1102 void _make_clone(
1103 OpContext *ctx,
1104 PGTransaction* t,
1105 ObjectContextRef obc,
1106 const hobject_t& head, const hobject_t& coid,
1107 object_info_t *poi);
1108 void execute_ctx(OpContext *ctx);
1109 void finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc=true);
1110 void reply_ctx(OpContext *ctx, int err);
1111 void reply_ctx(OpContext *ctx, int err, eversion_t v, version_t uv);
1112 void make_writeable(OpContext *ctx);
1113 void log_op_stats(OpContext *ctx);
1114
1115 void write_update_size_and_usage(object_stat_sum_t& stats, object_info_t& oi,
1116 interval_set<uint64_t>& modified, uint64_t offset,
1117 uint64_t length, bool write_full=false);
1118 void add_interval_usage(interval_set<uint64_t>& s, object_stat_sum_t& st);
1119
1120
1121 enum class cache_result_t {
1122 NOOP,
1123 BLOCKED_FULL,
1124 BLOCKED_PROMOTE,
1125 HANDLED_PROXY,
1126 HANDLED_REDIRECT,
1127 REPLIED_WITH_EAGAIN,
1128 };
1129 cache_result_t maybe_handle_cache_detail(OpRequestRef op,
1130 bool write_ordered,
1131 ObjectContextRef obc, int r,
1132 hobject_t missing_oid,
1133 bool must_promote,
1134 bool in_hit_set,
1135 ObjectContextRef *promote_obc);
1136 cache_result_t maybe_handle_manifest_detail(OpRequestRef op,
1137 bool write_ordered,
1138 ObjectContextRef obc);
1139 bool maybe_handle_manifest(OpRequestRef op,
1140 bool write_ordered,
1141 ObjectContextRef obc) {
1142 return cache_result_t::NOOP != maybe_handle_manifest_detail(
1143 op,
1144 write_ordered,
1145 obc);
1146 }
1147
1148 /**
1149 * This helper function is called from do_op if the ObjectContext lookup fails.
1150 * @returns true if the caching code is handling the Op, false otherwise.
1151 */
1152 bool maybe_handle_cache(OpRequestRef op,
1153 bool write_ordered,
1154 ObjectContextRef obc, int r,
1155 const hobject_t& missing_oid,
1156 bool must_promote,
1157 bool in_hit_set = false) {
1158 return cache_result_t::NOOP != maybe_handle_cache_detail(
1159 op,
1160 write_ordered,
1161 obc,
1162 r,
1163 missing_oid,
1164 must_promote,
1165 in_hit_set,
1166 nullptr);
1167 }
1168
1169 /**
1170 * This helper function checks if a promotion is needed.
1171 */
1172 bool maybe_promote(ObjectContextRef obc,
1173 const hobject_t& missing_oid,
1174 const object_locator_t& oloc,
1175 bool in_hit_set,
1176 uint32_t recency,
1177 OpRequestRef promote_op,
1178 ObjectContextRef *promote_obc = nullptr);
1179 /**
1180 * This helper function tells the client to redirect their request elsewhere.
1181 */
1182 void do_cache_redirect(OpRequestRef op);
1183 /**
1184 * This function attempts to start a promote. Either it succeeds,
1185 * or places op on a wait list. If op is null, failure means that
1186 * this is a noop. If a future user wants to be able to distinguish
1187 * these cases, a return value should be added.
1188 */
1189 void promote_object(
1190 ObjectContextRef obc, ///< [optional] obc
1191 const hobject_t& missing_object, ///< oid (if !obc)
1192 const object_locator_t& oloc, ///< locator for obc|oid
1193 OpRequestRef op, ///< [optional] client op
1194 ObjectContextRef *promote_obc = nullptr ///< [optional] new obc for object
1195 );
1196
1197 int prepare_transaction(OpContext *ctx);
1198 list<pair<OpRequestRef, OpContext*> > in_progress_async_reads;
1199 void complete_read_ctx(int result, OpContext *ctx);
1200
1201 // pg on-disk content
1202 void check_local() override;
1203
1204 void _clear_recovery_state() override;
1205
1206 bool start_recovery_ops(
1207 uint64_t max,
1208 ThreadPool::TPHandle &handle, uint64_t *started) override;
1209
1210 uint64_t recover_primary(uint64_t max, ThreadPool::TPHandle &handle);
1211 uint64_t recover_replicas(uint64_t max, ThreadPool::TPHandle &handle);
1212 hobject_t earliest_peer_backfill() const;
1213 bool all_peer_done() const;
1214 /**
1215 * @param work_started will be set to true if recover_backfill got anywhere
1216 * @returns the number of operations started
1217 */
1218 uint64_t recover_backfill(uint64_t max, ThreadPool::TPHandle &handle,
1219 bool *work_started);
1220
1221 /**
1222 * scan a (hash) range of objects in the current pg
1223 *
1224 * @begin first item should be >= this value
1225 * @min return at least this many items, unless we are done
1226 * @max return no more than this many items
1227 * @bi [out] resulting map of objects to eversion_t's
1228 */
1229 void scan_range(
1230 int min, int max, BackfillInterval *bi,
1231 ThreadPool::TPHandle &handle
1232 );
1233
1234 /// Update a hash range to reflect changes since the last scan
1235 void update_range(
1236 BackfillInterval *bi, ///< [in,out] interval to update
1237 ThreadPool::TPHandle &handle ///< [in] tp handle
1238 );
1239
1240 int prep_backfill_object_push(
1241 hobject_t oid, eversion_t v, ObjectContextRef obc,
1242 vector<pg_shard_t> peers,
1243 PGBackend::RecoveryHandle *h);
1244 void send_remove_op(const hobject_t& oid, eversion_t v, pg_shard_t peer);
1245
1246
1247 class C_OSD_OndiskWriteUnlock;
1248 class C_OSD_AppliedRecoveredObject;
1249 class C_OSD_CommittedPushedObject;
1250 class C_OSD_AppliedRecoveredObjectReplica;
1251 void sub_op_remove(OpRequestRef op);
1252
1253 void _applied_recovered_object(ObjectContextRef obc);
1254 void _applied_recovered_object_replica();
1255 void _committed_pushed_object(epoch_t epoch, eversion_t lc);
1256 void recover_got(hobject_t oid, eversion_t v);
1257
1258 // -- copyfrom --
1259 map<hobject_t, CopyOpRef> copy_ops;
1260
1261 int do_copy_get(OpContext *ctx, bufferlist::iterator& bp, OSDOp& op,
1262 ObjectContextRef& obc);
1263 int finish_copy_get();
1264
1265 void fill_in_copy_get_noent(OpRequestRef& op, hobject_t oid,
1266 OSDOp& osd_op);
1267
1268 /**
1269 * To copy an object, call start_copy.
1270 *
1271 * @param cb: The CopyCallback to be activated when the copy is complete
1272 * @param obc: The ObjectContext we are copying into
1273 * @param src: The source object
1274 * @param oloc: the source object locator
1275 * @param version: the version of the source object to copy (0 for any)
1276 */
1277 void start_copy(CopyCallback *cb, ObjectContextRef obc, hobject_t src,
1278 object_locator_t oloc, version_t version, unsigned flags,
1279 bool mirror_snapset, unsigned src_obj_fadvise_flags,
1280 unsigned dest_obj_fadvise_flags);
1281 void process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r);
1282 void _write_copy_chunk(CopyOpRef cop, PGTransaction *t);
1283 uint64_t get_copy_chunk_size() const {
1284 uint64_t size = cct->_conf->osd_copyfrom_max_chunk;
1285 if (pool.info.requires_aligned_append()) {
1286 uint64_t alignment = pool.info.required_alignment();
1287 if (size % alignment) {
1288 size += alignment - (size % alignment);
1289 }
1290 }
1291 return size;
1292 }
1293 void _copy_some(ObjectContextRef obc, CopyOpRef cop);
1294 void finish_copyfrom(CopyFromCallback *cb);
1295 void finish_promote(int r, CopyResults *results, ObjectContextRef obc);
1296 void cancel_copy(CopyOpRef cop, bool requeue, vector<ceph_tid_t> *tids);
1297 void cancel_copy_ops(bool requeue, vector<ceph_tid_t> *tids);
1298
1299 friend struct C_Copyfrom;
1300
1301 // -- flush --
1302 map<hobject_t, FlushOpRef> flush_ops;
1303
1304 /// start_flush takes ownership of on_flush iff ret == -EINPROGRESS
1305 int start_flush(
1306 OpRequestRef op, ObjectContextRef obc,
1307 bool blocking, hobject_t *pmissing,
1308 boost::optional<std::function<void()>> &&on_flush);
1309 void finish_flush(hobject_t oid, ceph_tid_t tid, int r);
1310 int try_flush_mark_clean(FlushOpRef fop);
1311 void cancel_flush(FlushOpRef fop, bool requeue, vector<ceph_tid_t> *tids);
1312 void cancel_flush_ops(bool requeue, vector<ceph_tid_t> *tids);
1313
1314 /// @return false if clone is has been evicted
1315 bool is_present_clone(hobject_t coid);
1316
1317 friend struct C_Flush;
1318
1319 // -- scrub --
1320 bool _range_available_for_scrub(
1321 const hobject_t &begin, const hobject_t &end) override;
1322 void scrub_snapshot_metadata(
1323 ScrubMap &map,
1324 const std::map<hobject_t,
1325 pair<boost::optional<uint32_t>,
1326 boost::optional<uint32_t>>> &missing_digest) override;
1327 void _scrub_clear_state() override;
1328 void _scrub_finish() override;
1329 object_stat_collection_t scrub_cstat;
1330
1331 void _split_into(pg_t child_pgid, PG *child,
1332 unsigned split_bits) override;
1333 void apply_and_flush_repops(bool requeue);
1334
1335 void calc_trim_to() override;
1336 int do_xattr_cmp_u64(int op, __u64 v1, bufferlist& xattr);
1337 int do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr);
1338
1339 // -- checksum --
1340 int do_checksum(OpContext *ctx, OSDOp& osd_op, bufferlist::iterator *bl_it);
1341 int finish_checksum(OSDOp& osd_op, Checksummer::CSumType csum_type,
1342 bufferlist::iterator *init_value_bl_it,
1343 const bufferlist &read_bl);
1344
1345 friend class C_ChecksumRead;
1346
1347 int do_extent_cmp(OpContext *ctx, OSDOp& osd_op);
1348 int finish_extent_cmp(OSDOp& osd_op, const bufferlist &read_bl);
1349
1350 friend class C_ExtentCmpRead;
1351
1352 int do_read(OpContext *ctx, OSDOp& osd_op);
1353 int do_sparse_read(OpContext *ctx, OSDOp& osd_op);
1354 int do_writesame(OpContext *ctx, OSDOp& osd_op);
1355
1356 bool pgls_filter(PGLSFilter *filter, hobject_t& sobj, bufferlist& outdata);
1357 int get_pgls_filter(bufferlist::iterator& iter, PGLSFilter **pfilter);
1358
1359 map<hobject_t, list<OpRequestRef>> in_progress_proxy_ops;
1360 void kick_proxy_ops_blocked(hobject_t& soid);
1361 void cancel_proxy_ops(bool requeue, vector<ceph_tid_t> *tids);
1362
1363 // -- proxyread --
1364 map<ceph_tid_t, ProxyReadOpRef> proxyread_ops;
1365
1366 void do_proxy_read(OpRequestRef op, ObjectContextRef obc = NULL);
1367 void finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r);
1368 void cancel_proxy_read(ProxyReadOpRef prdop, vector<ceph_tid_t> *tids);
1369
1370 friend struct C_ProxyRead;
1371
1372 // -- proxywrite --
1373 map<ceph_tid_t, ProxyWriteOpRef> proxywrite_ops;
1374
1375 void do_proxy_write(OpRequestRef op, const hobject_t& missing_oid, ObjectContextRef obc = NULL);
1376 void cancel_and_requeue_proxy_ops(hobject_t oid);
1377 void finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r);
1378 void cancel_proxy_write(ProxyWriteOpRef pwop, vector<ceph_tid_t> *tids);
1379
1380 friend struct C_ProxyWrite_Commit;
1381
1382 public:
1383 PrimaryLogPG(OSDService *o, OSDMapRef curmap,
1384 const PGPool &_pool, spg_t p);
1385 ~PrimaryLogPG() override {}
1386
1387 int do_command(
1388 cmdmap_t cmdmap,
1389 ostream& ss,
1390 bufferlist& idata,
1391 bufferlist& odata,
1392 ConnectionRef conn,
1393 ceph_tid_t tid) override;
1394
1395 void do_request(
1396 OpRequestRef& op,
1397 ThreadPool::TPHandle &handle) override;
1398 void do_op(OpRequestRef& op) override;
1399 void record_write_error(OpRequestRef op, const hobject_t &soid,
1400 MOSDOpReply *orig_reply, int r);
1401 void do_pg_op(OpRequestRef op);
1402 void do_sub_op(OpRequestRef op) override;
1403 void do_sub_op_reply(OpRequestRef op) override;
1404 void do_scan(
1405 OpRequestRef op,
1406 ThreadPool::TPHandle &handle) override;
1407 void do_backfill(OpRequestRef op) override;
1408 void do_backfill_remove(OpRequestRef op);
1409
1410 void handle_backoff(OpRequestRef& op);
1411
1412 int trim_object(bool first, const hobject_t &coid, OpContextUPtr *ctxp);
1413 void snap_trimmer(epoch_t e) override;
1414 void kick_snap_trim() override;
1415 void snap_trimmer_scrub_complete() override;
1416 int do_osd_ops(OpContext *ctx, vector<OSDOp>& ops);
1417
1418 int _get_tmap(OpContext *ctx, bufferlist *header, bufferlist *vals);
1419 int do_tmap2omap(OpContext *ctx, unsigned flags);
1420 int do_tmapup(OpContext *ctx, bufferlist::iterator& bp, OSDOp& osd_op);
1421 int do_tmapup_slow(OpContext *ctx, bufferlist::iterator& bp, OSDOp& osd_op, bufferlist& bl);
1422
1423 void do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn);
1424 private:
1425 int do_scrub_ls(MOSDOp *op, OSDOp *osd_op);
1426 hobject_t earliest_backfill() const;
1427 bool check_src_targ(const hobject_t& soid, const hobject_t& toid) const;
1428
1429 uint64_t temp_seq; ///< last id for naming temp objects
1430 /// generate a new temp object name
1431 hobject_t generate_temp_object(const hobject_t& target);
1432 /// generate a new temp object name (for recovery)
1433 hobject_t get_temp_recovery_object(const hobject_t& target,
1434 eversion_t version) override;
1435 int get_recovery_op_priority() const {
1436 int pri = 0;
1437 pool.info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
1438 return pri > 0 ? pri : cct->_conf->osd_recovery_op_priority;
1439 }
1440 void log_missing(unsigned missing,
1441 const boost::optional<hobject_t> &head,
1442 LogChannelRef clog,
1443 const spg_t &pgid,
1444 const char *func,
1445 const char *mode,
1446 bool allow_incomplete_clones);
1447 unsigned process_clones_to(const boost::optional<hobject_t> &head,
1448 const boost::optional<SnapSet> &snapset,
1449 LogChannelRef clog,
1450 const spg_t &pgid,
1451 const char *mode,
1452 bool allow_incomplete_clones,
1453 boost::optional<snapid_t> target,
1454 vector<snapid_t>::reverse_iterator *curclone,
1455 inconsistent_snapset_wrapper &snap_error);
1456
1457 public:
1458 coll_t get_coll() {
1459 return coll;
1460 }
1461 void split_colls(
1462 spg_t child,
1463 int split_bits,
1464 int seed,
1465 const pg_pool_t *pool,
1466 ObjectStore::Transaction *t) override {
1467 coll_t target = coll_t(child);
1468 PG::_create(*t, child, split_bits);
1469 t->split_collection(
1470 coll,
1471 split_bits,
1472 seed,
1473 target);
1474 PG::_init(*t, child, pool);
1475 }
1476 private:
1477
1478 struct DoSnapWork : boost::statechart::event< DoSnapWork > {
1479 DoSnapWork() : boost::statechart::event < DoSnapWork >() {}
1480 };
1481 struct KickTrim : boost::statechart::event< KickTrim > {
1482 KickTrim() : boost::statechart::event < KickTrim >() {}
1483 };
1484 struct RepopsComplete : boost::statechart::event< RepopsComplete > {
1485 RepopsComplete() : boost::statechart::event < RepopsComplete >() {}
1486 };
1487 struct ScrubComplete : boost::statechart::event< ScrubComplete > {
1488 ScrubComplete() : boost::statechart::event < ScrubComplete >() {}
1489 };
1490 struct TrimWriteUnblocked : boost::statechart::event< TrimWriteUnblocked > {
1491 TrimWriteUnblocked() : boost::statechart::event < TrimWriteUnblocked >() {}
1492 };
1493 struct Reset : boost::statechart::event< Reset > {
1494 Reset() : boost::statechart::event< Reset >() {}
1495 };
1496 struct SnapTrimReserved : boost::statechart::event< SnapTrimReserved > {
1497 SnapTrimReserved() : boost::statechart::event< SnapTrimReserved >() {}
1498 };
1499 struct SnapTrimTimerReady : boost::statechart::event< SnapTrimTimerReady > {
1500 SnapTrimTimerReady() : boost::statechart::event< SnapTrimTimerReady >() {}
1501 };
1502
1503 struct NotTrimming;
1504 struct SnapTrimmer : public boost::statechart::state_machine< SnapTrimmer, NotTrimming > {
1505 PrimaryLogPG *pg;
1506 explicit SnapTrimmer(PrimaryLogPG *pg) : pg(pg) {}
1507 void log_enter(const char *state_name);
1508 void log_exit(const char *state_name, utime_t duration);
1509 bool can_trim() {
1510 return pg->is_clean() && !pg->scrubber.active && !pg->snap_trimq.empty();
1511 }
1512 } snap_trimmer_machine;
1513
1514 struct WaitReservation;
1515 struct Trimming : boost::statechart::state< Trimming, SnapTrimmer, WaitReservation >, NamedState {
1516 typedef boost::mpl::list <
1517 boost::statechart::custom_reaction< KickTrim >,
1518 boost::statechart::transition< Reset, NotTrimming >
1519 > reactions;
1520
1521 set<hobject_t> in_flight;
1522 snapid_t snap_to_trim;
1523
1524 explicit Trimming(my_context ctx)
1525 : my_base(ctx),
1526 NamedState(context< SnapTrimmer >().pg, "Trimming") {
1527 context< SnapTrimmer >().log_enter(state_name);
1528 assert(context< SnapTrimmer >().can_trim());
1529 assert(in_flight.empty());
1530 }
1531 void exit() {
1532 context< SnapTrimmer >().log_exit(state_name, enter_time);
1533 auto *pg = context< SnapTrimmer >().pg;
1534 pg->osd->snap_reserver.cancel_reservation(pg->get_pgid());
1535 pg->state_clear(PG_STATE_SNAPTRIM);
1536 pg->publish_stats_to_osd();
1537 }
1538 boost::statechart::result react(const KickTrim&) {
1539 return discard_event();
1540 }
1541 };
1542
1543 /* SnapTrimmerStates */
1544 struct WaitTrimTimer : boost::statechart::state< WaitTrimTimer, Trimming >, NamedState {
1545 typedef boost::mpl::list <
1546 boost::statechart::custom_reaction< SnapTrimTimerReady >
1547 > reactions;
1548 Context *wakeup = nullptr;
1549 explicit WaitTrimTimer(my_context ctx)
1550 : my_base(ctx),
1551 NamedState(context< SnapTrimmer >().pg, "Trimming/WaitTrimTimer") {
1552 context< SnapTrimmer >().log_enter(state_name);
1553 assert(context<Trimming>().in_flight.empty());
1554 struct OnTimer : Context {
1555 PrimaryLogPGRef pg;
1556 epoch_t epoch;
1557 OnTimer(PrimaryLogPGRef pg, epoch_t epoch) : pg(pg), epoch(epoch) {}
1558 void finish(int) override {
1559 pg->lock();
1560 if (!pg->pg_has_reset_since(epoch))
1561 pg->snap_trimmer_machine.process_event(SnapTrimTimerReady());
1562 pg->unlock();
1563 }
1564 };
1565 auto *pg = context< SnapTrimmer >().pg;
1566 if (pg->cct->_conf->osd_snap_trim_sleep > 0) {
1567 Mutex::Locker l(pg->osd->snap_sleep_lock);
1568 wakeup = pg->osd->snap_sleep_timer.add_event_after(
1569 pg->cct->_conf->osd_snap_trim_sleep,
1570 new OnTimer{pg, pg->get_osdmap()->get_epoch()});
1571 } else {
1572 post_event(SnapTrimTimerReady());
1573 }
1574 }
1575 void exit() {
1576 context< SnapTrimmer >().log_exit(state_name, enter_time);
1577 auto *pg = context< SnapTrimmer >().pg;
1578 if (wakeup) {
1579 Mutex::Locker l(pg->osd->snap_sleep_lock);
1580 pg->osd->snap_sleep_timer.cancel_event(wakeup);
1581 wakeup = nullptr;
1582 }
1583 }
1584 boost::statechart::result react(const SnapTrimTimerReady &) {
1585 wakeup = nullptr;
1586 if (!context< SnapTrimmer >().can_trim()) {
1587 post_event(KickTrim());
1588 return transit< NotTrimming >();
1589 } else {
1590 return transit< AwaitAsyncWork >();
1591 }
1592 }
1593 };
1594
1595 struct WaitRWLock : boost::statechart::state< WaitRWLock, Trimming >, NamedState {
1596 typedef boost::mpl::list <
1597 boost::statechart::custom_reaction< TrimWriteUnblocked >
1598 > reactions;
1599 explicit WaitRWLock(my_context ctx)
1600 : my_base(ctx),
1601 NamedState(context< SnapTrimmer >().pg, "Trimming/WaitRWLock") {
1602 context< SnapTrimmer >().log_enter(state_name);
1603 assert(context<Trimming>().in_flight.empty());
1604 }
1605 void exit() {
1606 context< SnapTrimmer >().log_exit(state_name, enter_time);
1607 }
1608 boost::statechart::result react(const TrimWriteUnblocked&) {
1609 if (!context< SnapTrimmer >().can_trim()) {
1610 post_event(KickTrim());
1611 return transit< NotTrimming >();
1612 } else {
1613 return transit< AwaitAsyncWork >();
1614 }
1615 }
1616 };
1617
1618 struct WaitRepops : boost::statechart::state< WaitRepops, Trimming >, NamedState {
1619 typedef boost::mpl::list <
1620 boost::statechart::custom_reaction< RepopsComplete >
1621 > reactions;
1622 explicit WaitRepops(my_context ctx)
1623 : my_base(ctx),
1624 NamedState(context< SnapTrimmer >().pg, "Trimming/WaitRepops") {
1625 context< SnapTrimmer >().log_enter(state_name);
1626 assert(!context<Trimming>().in_flight.empty());
1627 }
1628 void exit() {
1629 context< SnapTrimmer >().log_exit(state_name, enter_time);
1630 }
1631 boost::statechart::result react(const RepopsComplete&) {
1632 if (!context< SnapTrimmer >().can_trim()) {
1633 post_event(KickTrim());
1634 return transit< NotTrimming >();
1635 } else {
1636 return transit< WaitTrimTimer >();
1637 }
1638 }
1639 };
1640
1641 struct AwaitAsyncWork : boost::statechart::state< AwaitAsyncWork, Trimming >, NamedState {
1642 typedef boost::mpl::list <
1643 boost::statechart::custom_reaction< DoSnapWork >
1644 > reactions;
1645 explicit AwaitAsyncWork(my_context ctx);
1646 void exit() {
1647 context< SnapTrimmer >().log_exit(state_name, enter_time);
1648 }
1649 boost::statechart::result react(const DoSnapWork&);
1650 };
1651
1652 struct WaitReservation : boost::statechart::state< WaitReservation, Trimming >, NamedState {
1653 /* WaitReservation is a sub-state of trimming simply so that exiting Trimming
1654 * always cancels the reservation */
1655 typedef boost::mpl::list <
1656 boost::statechart::custom_reaction< SnapTrimReserved >
1657 > reactions;
1658 struct ReservationCB : public Context {
1659 PrimaryLogPGRef pg;
1660 bool canceled;
1661 ReservationCB(PrimaryLogPG *pg) : pg(pg), canceled(false) {}
1662 void finish(int) override {
1663 pg->lock();
1664 if (!canceled)
1665 pg->snap_trimmer_machine.process_event(SnapTrimReserved());
1666 pg->unlock();
1667 }
1668 void cancel() {
1669 assert(pg->is_locked());
1670 assert(!canceled);
1671 canceled = true;
1672 }
1673 };
1674 ReservationCB *pending = nullptr;
1675
1676 explicit WaitReservation(my_context ctx)
1677 : my_base(ctx),
1678 NamedState(context< SnapTrimmer >().pg, "Trimming/WaitReservation") {
1679 context< SnapTrimmer >().log_enter(state_name);
1680 assert(context<Trimming>().in_flight.empty());
1681 auto *pg = context< SnapTrimmer >().pg;
1682 pending = new ReservationCB(pg);
1683 pg->osd->snap_reserver.request_reservation(
1684 pg->get_pgid(),
1685 pending,
1686 0);
1687 pg->state_set(PG_STATE_SNAPTRIM_WAIT);
1688 pg->publish_stats_to_osd();
1689 }
1690 boost::statechart::result react(const SnapTrimReserved&);
1691 void exit() {
1692 context< SnapTrimmer >().log_exit(state_name, enter_time);
1693 if (pending)
1694 pending->cancel();
1695 pending = nullptr;
1696 auto *pg = context< SnapTrimmer >().pg;
1697 pg->state_clear(PG_STATE_SNAPTRIM_WAIT);
1698 pg->state_clear(PG_STATE_SNAPTRIM_ERROR);
1699 pg->publish_stats_to_osd();
1700 }
1701 };
1702
1703 struct WaitScrub : boost::statechart::state< WaitScrub, SnapTrimmer >, NamedState {
1704 typedef boost::mpl::list <
1705 boost::statechart::custom_reaction< ScrubComplete >,
1706 boost::statechart::custom_reaction< KickTrim >,
1707 boost::statechart::transition< Reset, NotTrimming >
1708 > reactions;
1709 explicit WaitScrub(my_context ctx)
1710 : my_base(ctx),
1711 NamedState(context< SnapTrimmer >().pg, "Trimming/WaitScrub") {
1712 context< SnapTrimmer >().log_enter(state_name);
1713 }
1714 void exit() {
1715 context< SnapTrimmer >().log_exit(state_name, enter_time);
1716 }
1717 boost::statechart::result react(const ScrubComplete&) {
1718 post_event(KickTrim());
1719 return transit< NotTrimming >();
1720 }
1721 boost::statechart::result react(const KickTrim&) {
1722 return discard_event();
1723 }
1724 };
1725
1726 struct NotTrimming : boost::statechart::state< NotTrimming, SnapTrimmer >, NamedState {
1727 typedef boost::mpl::list <
1728 boost::statechart::custom_reaction< KickTrim >,
1729 boost::statechart::transition< Reset, NotTrimming >
1730 > reactions;
1731 explicit NotTrimming(my_context ctx);
1732 void exit();
1733 boost::statechart::result react(const KickTrim&);
1734 };
1735
1736 int _verify_no_head_clones(const hobject_t& soid,
1737 const SnapSet& ss);
1738 // return true if we're creating a local object, false for a
1739 // whiteout or no change.
1740 void maybe_create_new_object(OpContext *ctx, bool ignore_transaction=false);
1741 int _delete_oid(OpContext *ctx, bool no_whiteout, bool try_no_whiteout);
1742 int _rollback_to(OpContext *ctx, ceph_osd_op& op);
1743 public:
1744 bool is_missing_object(const hobject_t& oid) const;
1745 bool is_unreadable_object(const hobject_t &oid) const {
1746 return is_missing_object(oid) ||
1747 !missing_loc.readable_with_acting(oid, actingset);
1748 }
1749 void maybe_kick_recovery(const hobject_t &soid);
1750 void wait_for_unreadable_object(const hobject_t& oid, OpRequestRef op);
1751 void wait_for_all_missing(OpRequestRef op);
1752
1753 bool is_degraded_or_backfilling_object(const hobject_t& oid);
1754 void wait_for_degraded_object(const hobject_t& oid, OpRequestRef op);
1755
1756 void block_write_on_full_cache(
1757 const hobject_t& oid, OpRequestRef op);
1758 void block_for_clean(
1759 const hobject_t& oid, OpRequestRef op);
1760 void block_write_on_snap_rollback(
1761 const hobject_t& oid, ObjectContextRef obc, OpRequestRef op);
1762 void block_write_on_degraded_snap(const hobject_t& oid, OpRequestRef op);
1763
1764 bool maybe_await_blocked_snapset(const hobject_t &soid, OpRequestRef op);
1765 void wait_for_blocked_object(const hobject_t& soid, OpRequestRef op);
1766 void kick_object_context_blocked(ObjectContextRef obc);
1767
1768 void maybe_force_recovery();
1769
1770 void mark_all_unfound_lost(
1771 int what,
1772 ConnectionRef con,
1773 ceph_tid_t tid);
1774 eversion_t pick_newest_available(const hobject_t& oid);
1775
1776 void do_update_log_missing(
1777 OpRequestRef &op);
1778
1779 void do_update_log_missing_reply(
1780 OpRequestRef &op);
1781
1782 void on_role_change() override;
1783 void on_pool_change() override;
1784 void _on_new_interval() override;
1785 void clear_async_reads();
1786 void on_change(ObjectStore::Transaction *t) override;
1787 void on_activate() override;
1788 void on_flushed() override;
1789 void on_removal(ObjectStore::Transaction *t) override;
1790 void on_shutdown() override;
1791 bool check_failsafe_full(ostream &ss) override;
1792 bool check_osdmap_full(const set<pg_shard_t> &missing_on) override;
1793 bool maybe_preempt_replica_scrub(const hobject_t& oid) override {
1794 return write_blocked_by_scrub(oid);
1795 }
1796 int rep_repair_primary_object(const hobject_t& soid, OpRequestRef op);
1797
1798 // attr cache handling
1799 void setattr_maybe_cache(
1800 ObjectContextRef obc,
1801 OpContext *op,
1802 PGTransaction *t,
1803 const string &key,
1804 bufferlist &val);
1805 void setattrs_maybe_cache(
1806 ObjectContextRef obc,
1807 OpContext *op,
1808 PGTransaction *t,
1809 map<string, bufferlist> &attrs);
1810 void rmattr_maybe_cache(
1811 ObjectContextRef obc,
1812 OpContext *op,
1813 PGTransaction *t,
1814 const string &key);
1815 int getattr_maybe_cache(
1816 ObjectContextRef obc,
1817 const string &key,
1818 bufferlist *val);
1819 int getattrs_maybe_cache(
1820 ObjectContextRef obc,
1821 map<string, bufferlist> *out);
1822 };
1823
1824 inline ostream& operator<<(ostream& out, const PrimaryLogPG::RepGather& repop)
1825 {
1826 out << "repgather(" << &repop
1827 << " " << repop.v
1828 << " rep_tid=" << repop.rep_tid
1829 << " committed?=" << repop.all_committed
1830 << " applied?=" << repop.all_applied
1831 << " r=" << repop.r
1832 << ")";
1833 return out;
1834 }
1835
1836 inline ostream& operator<<(ostream& out,
1837 const PrimaryLogPG::ProxyWriteOpRef& pwop)
1838 {
1839 out << "proxywrite(" << &pwop
1840 << " " << pwop->user_version
1841 << " pwop_tid=" << pwop->objecter_tid;
1842 if (pwop->ctx->op)
1843 out << " op=" << *(pwop->ctx->op->get_req());
1844 out << ")";
1845 return out;
1846 }
1847
1848 void intrusive_ptr_add_ref(PrimaryLogPG::RepGather *repop);
1849 void intrusive_ptr_release(PrimaryLogPG::RepGather *repop);
1850
1851
1852 #endif