]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/PrimaryLogPG.h
update sources to v12.1.1
[ceph.git] / ceph / src / osd / PrimaryLogPG.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 /*
3 * Ceph - scalable distributed file system
4 *
5 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
6 * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
7 *
8 * Author: Loic Dachary <loic@dachary.org>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifndef CEPH_REPLICATEDPG_H
18 #define CEPH_REPLICATEDPG_H
19
20 #include <boost/tuple/tuple.hpp>
21 #include "include/assert.h"
22 #include "PG.h"
23 #include "Watch.h"
24 #include "TierAgentState.h"
25 #include "messages/MOSDOpReply.h"
26 #include "common/Checksummer.h"
27 #include "common/sharedptr_registry.hpp"
28 #include "ReplicatedBackend.h"
29 #include "PGTransaction.h"
30
31 class CopyFromCallback;
32 class PromoteCallback;
33
34 class PrimaryLogPG;
35 class PGLSFilter;
36 class HitSet;
37 struct TierAgentState;
38 class MOSDOp;
39 class MOSDOpReply;
40 class OSDService;
41
42 void intrusive_ptr_add_ref(PrimaryLogPG *pg);
43 void intrusive_ptr_release(PrimaryLogPG *pg);
44 uint64_t get_with_id(PrimaryLogPG *pg);
45 void put_with_id(PrimaryLogPG *pg, uint64_t id);
46
47 #ifdef PG_DEBUG_REFS
48 typedef TrackedIntPtr<PrimaryLogPG> PrimaryLogPGRef;
49 #else
50 typedef boost::intrusive_ptr<PrimaryLogPG> PrimaryLogPGRef;
51 #endif
52
53 struct inconsistent_snapset_wrapper;
54
55 class PrimaryLogPG : public PG, public PGBackend::Listener {
56 friend class OSD;
57 friend class Watch;
58
59 public:
60 MEMPOOL_CLASS_HELPERS();
61
62 /*
63 * state associated with a copy operation
64 */
65 struct OpContext;
66 class CopyCallback;
67
68 /**
69 * CopyResults stores the object metadata of interest to a copy initiator.
70 */
71 struct CopyResults {
72 ceph::real_time mtime; ///< the copy source's mtime
73 uint64_t object_size; ///< the copied object's size
74 bool started_temp_obj; ///< true if the callback needs to delete temp object
75 hobject_t temp_oid; ///< temp object (if any)
76
77 /**
78 * Function to fill in transaction; if non-empty the callback
79 * must execute it before any other accesses to the object
80 * (in order to complete the copy).
81 */
82 std::function<void(PGTransaction *)> fill_in_final_tx;
83
84 version_t user_version; ///< The copy source's user version
85 bool should_requeue; ///< op should be requeued on cancel
86 vector<snapid_t> snaps; ///< src's snaps (if clone)
87 snapid_t snap_seq; ///< src's snap_seq (if head)
88 librados::snap_set_t snapset; ///< src snapset (if head)
89 bool mirror_snapset;
90 bool has_omap;
91 uint32_t flags; // object_copy_data_t::FLAG_*
92 uint32_t source_data_digest, source_omap_digest;
93 uint32_t data_digest, omap_digest;
94 mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > reqids; // [(reqid, user_version)]
95 map<string, bufferlist> attrs; // xattrs
96 uint64_t truncate_seq;
97 uint64_t truncate_size;
98 bool is_data_digest() {
99 return flags & object_copy_data_t::FLAG_DATA_DIGEST;
100 }
101 bool is_omap_digest() {
102 return flags & object_copy_data_t::FLAG_OMAP_DIGEST;
103 }
104 CopyResults()
105 : object_size(0), started_temp_obj(false),
106 user_version(0),
107 should_requeue(false), mirror_snapset(false),
108 has_omap(false),
109 flags(0),
110 source_data_digest(-1), source_omap_digest(-1),
111 data_digest(-1), omap_digest(-1),
112 truncate_seq(0), truncate_size(0)
113 {}
114 };
115
116 struct CopyOp {
117 CopyCallback *cb;
118 ObjectContextRef obc;
119 hobject_t src;
120 object_locator_t oloc;
121 unsigned flags;
122 bool mirror_snapset;
123
124 CopyResults results;
125
126 ceph_tid_t objecter_tid;
127 ceph_tid_t objecter_tid2;
128
129 object_copy_cursor_t cursor;
130 map<string,bufferlist> attrs;
131 bufferlist data;
132 bufferlist omap_header;
133 bufferlist omap_data;
134 int rval;
135
136 object_copy_cursor_t temp_cursor;
137
138 /*
139 * For CopyOp the process is:
140 * step1: read the data(attr/omap/data) from the source object
141 * step2: handle those data(w/ those data create a new object)
142 * src_obj_fadvise_flags used in step1;
143 * dest_obj_fadvise_flags used in step2
144 */
145 unsigned src_obj_fadvise_flags;
146 unsigned dest_obj_fadvise_flags;
147
148 CopyOp(CopyCallback *cb_, ObjectContextRef _obc, hobject_t s,
149 object_locator_t l,
150 version_t v,
151 unsigned f,
152 bool ms,
153 unsigned src_obj_fadvise_flags,
154 unsigned dest_obj_fadvise_flags)
155 : cb(cb_), obc(_obc), src(s), oloc(l), flags(f),
156 mirror_snapset(ms),
157 objecter_tid(0),
158 objecter_tid2(0),
159 rval(-1),
160 src_obj_fadvise_flags(src_obj_fadvise_flags),
161 dest_obj_fadvise_flags(dest_obj_fadvise_flags)
162 {
163 results.user_version = v;
164 results.mirror_snapset = mirror_snapset;
165 }
166 };
167 typedef ceph::shared_ptr<CopyOp> CopyOpRef;
168
169 /**
170 * The CopyCallback class defines an interface for completions to the
171 * copy_start code. Users of the copy infrastructure must implement
172 * one and give an instance of the class to start_copy.
173 *
174 * The implementer is responsible for making sure that the CopyCallback
175 * can associate itself with the correct copy operation.
176 */
177 typedef boost::tuple<int, CopyResults*> CopyCallbackResults;
178
179 friend class CopyFromCallback;
180 friend class PromoteCallback;
181
182 struct ProxyReadOp {
183 OpRequestRef op;
184 hobject_t soid;
185 ceph_tid_t objecter_tid;
186 vector<OSDOp> &ops;
187 version_t user_version;
188 int data_offset;
189 bool canceled; ///< true if canceled
190
191 ProxyReadOp(OpRequestRef _op, hobject_t oid, vector<OSDOp>& _ops)
192 : op(_op), soid(oid),
193 objecter_tid(0), ops(_ops),
194 user_version(0), data_offset(0),
195 canceled(false) { }
196 };
197 typedef ceph::shared_ptr<ProxyReadOp> ProxyReadOpRef;
198
199 struct ProxyWriteOp {
200 OpContext *ctx;
201 OpRequestRef op;
202 hobject_t soid;
203 ceph_tid_t objecter_tid;
204 vector<OSDOp> &ops;
205 version_t user_version;
206 bool sent_reply;
207 utime_t mtime;
208 bool canceled;
209 osd_reqid_t reqid;
210
211 ProxyWriteOp(OpRequestRef _op, hobject_t oid, vector<OSDOp>& _ops, osd_reqid_t _reqid)
212 : ctx(NULL), op(_op), soid(oid),
213 objecter_tid(0), ops(_ops),
214 user_version(0), sent_reply(false),
215 canceled(false),
216 reqid(_reqid) { }
217 };
218 typedef ceph::shared_ptr<ProxyWriteOp> ProxyWriteOpRef;
219
220 struct FlushOp {
221 ObjectContextRef obc; ///< obc we are flushing
222 OpRequestRef op; ///< initiating op
223 list<OpRequestRef> dup_ops; ///< bandwagon jumpers
224 version_t flushed_version; ///< user version we are flushing
225 ceph_tid_t objecter_tid; ///< copy-from request tid
226 int rval; ///< copy-from result
227 bool blocking; ///< whether we are blocking updates
228 bool removal; ///< we are removing the backend object
229 boost::optional<std::function<void()>> on_flush; ///< callback, may be null
230
231 FlushOp()
232 : flushed_version(0), objecter_tid(0), rval(0),
233 blocking(false), removal(false) {}
234 ~FlushOp() { assert(!on_flush); }
235 };
236 typedef ceph::shared_ptr<FlushOp> FlushOpRef;
237
238 boost::scoped_ptr<PGBackend> pgbackend;
239 PGBackend *get_pgbackend() override {
240 return pgbackend.get();
241 }
242
243 /// Listener methods
244 DoutPrefixProvider *get_dpp() override {
245 return this;
246 }
247
248 void on_local_recover(
249 const hobject_t &oid,
250 const ObjectRecoveryInfo &recovery_info,
251 ObjectContextRef obc,
252 ObjectStore::Transaction *t
253 ) override;
254 void on_peer_recover(
255 pg_shard_t peer,
256 const hobject_t &oid,
257 const ObjectRecoveryInfo &recovery_info
258 ) override;
259 void begin_peer_recover(
260 pg_shard_t peer,
261 const hobject_t oid) override;
262 void on_global_recover(
263 const hobject_t &oid,
264 const object_stat_sum_t &stat_diff) override;
265 void failed_push(const list<pg_shard_t> &from, const hobject_t &soid) override;
266 void primary_failed(const hobject_t &soid) override;
267 bool primary_error(const hobject_t& soid, eversion_t v) override;
268 void cancel_pull(const hobject_t &soid) override;
269 void apply_stats(
270 const hobject_t &soid,
271 const object_stat_sum_t &delta_stats) override;
272 void on_primary_error(const hobject_t &oid, eversion_t v) override;
273
274 template<class T> class BlessedGenContext;
275 class BlessedContext;
276 Context *bless_context(Context *c) override;
277
278 GenContext<ThreadPool::TPHandle&> *bless_gencontext(
279 GenContext<ThreadPool::TPHandle&> *c) override;
280
281 void send_message(int to_osd, Message *m) override {
282 osd->send_message_osd_cluster(to_osd, m, get_osdmap()->get_epoch());
283 }
284 void queue_transaction(ObjectStore::Transaction&& t,
285 OpRequestRef op) override {
286 osd->store->queue_transaction(osr.get(), std::move(t), 0, 0, 0, op);
287 }
288 void queue_transactions(vector<ObjectStore::Transaction>& tls,
289 OpRequestRef op) override {
290 osd->store->queue_transactions(osr.get(), tls, 0, 0, 0, op, NULL);
291 }
292 epoch_t get_epoch() const override {
293 return get_osdmap()->get_epoch();
294 }
295 epoch_t get_interval_start_epoch() const override {
296 return info.history.same_interval_since;
297 }
298 epoch_t get_last_peering_reset_epoch() const override {
299 return get_last_peering_reset();
300 }
301 const set<pg_shard_t> &get_actingbackfill_shards() const override {
302 return actingbackfill;
303 }
304 const set<pg_shard_t> &get_acting_shards() const override {
305 return actingset;
306 }
307 const set<pg_shard_t> &get_backfill_shards() const override {
308 return backfill_targets;
309 }
310
311 std::string gen_dbg_prefix() const override { return gen_prefix(); }
312
313 const map<hobject_t, set<pg_shard_t>>
314 &get_missing_loc_shards() const override {
315 return missing_loc.get_missing_locs();
316 }
317 const map<pg_shard_t, pg_missing_t> &get_shard_missing() const override {
318 return peer_missing;
319 }
320 using PGBackend::Listener::get_shard_missing;
321 const map<pg_shard_t, pg_info_t> &get_shard_info() const override {
322 return peer_info;
323 }
324 using PGBackend::Listener::get_shard_info;
325 const pg_missing_tracker_t &get_local_missing() const override {
326 return pg_log.get_missing();
327 }
328 const PGLog &get_log() const override {
329 return pg_log;
330 }
331 bool pgb_is_primary() const override {
332 return is_primary();
333 }
334 OSDMapRef pgb_get_osdmap() const override {
335 return get_osdmap();
336 }
337 const pg_info_t &get_info() const override {
338 return info;
339 }
340 const pg_pool_t &get_pool() const override {
341 return pool.info;
342 }
343
344 ObjectContextRef get_obc(
345 const hobject_t &hoid,
346 const map<string, bufferlist> &attrs) override {
347 return get_object_context(hoid, true, &attrs);
348 }
349
350 bool try_lock_for_read(
351 const hobject_t &hoid,
352 ObcLockManager &manager) override {
353 if (is_missing_object(hoid))
354 return false;
355 auto obc = get_object_context(hoid, false, nullptr);
356 if (!obc)
357 return false;
358 return manager.try_get_read_lock(hoid, obc);
359 }
360
361 void release_locks(ObcLockManager &manager) override {
362 release_object_locks(manager);
363 }
364
365 void pgb_set_object_snap_mapping(
366 const hobject_t &soid,
367 const set<snapid_t> &snaps,
368 ObjectStore::Transaction *t) override {
369 return update_object_snap_mapping(t, soid, snaps);
370 }
371 void pgb_clear_object_snap_mapping(
372 const hobject_t &soid,
373 ObjectStore::Transaction *t) override {
374 return clear_object_snap_mapping(t, soid);
375 }
376
377 void log_operation(
378 const vector<pg_log_entry_t> &logv,
379 const boost::optional<pg_hit_set_history_t> &hset_history,
380 const eversion_t &trim_to,
381 const eversion_t &roll_forward_to,
382 bool transaction_applied,
383 ObjectStore::Transaction &t) override {
384 if (hset_history) {
385 info.hit_set = *hset_history;
386 }
387 append_log(logv, trim_to, roll_forward_to, t, transaction_applied);
388 }
389
390 struct C_OSD_OnApplied;
391 void op_applied(
392 const eversion_t &applied_version) override;
393
394 bool should_send_op(
395 pg_shard_t peer,
396 const hobject_t &hoid) override {
397 if (peer == get_primary())
398 return true;
399 assert(peer_info.count(peer));
400 bool should_send =
401 hoid.pool != (int64_t)info.pgid.pool() ||
402 hoid <= last_backfill_started ||
403 hoid <= peer_info[peer].last_backfill;
404 if (!should_send)
405 assert(is_backfill_targets(peer));
406 return should_send;
407 }
408
409 void update_peer_last_complete_ondisk(
410 pg_shard_t fromosd,
411 eversion_t lcod) override {
412 peer_last_complete_ondisk[fromosd] = lcod;
413 }
414
415 void update_last_complete_ondisk(
416 eversion_t lcod) override {
417 last_complete_ondisk = lcod;
418 }
419
420 void update_stats(
421 const pg_stat_t &stat) override {
422 info.stats = stat;
423 }
424
425 void schedule_recovery_work(
426 GenContext<ThreadPool::TPHandle&> *c) override;
427
428 pg_shard_t whoami_shard() const override {
429 return pg_whoami;
430 }
431 spg_t primary_spg_t() const override {
432 return spg_t(info.pgid.pgid, primary.shard);
433 }
434 pg_shard_t primary_shard() const override {
435 return primary;
436 }
437 uint64_t min_peer_features() const override {
438 return get_min_peer_features();
439 }
440
441 void send_message_osd_cluster(
442 int peer, Message *m, epoch_t from_epoch) override;
443 void send_message_osd_cluster(
444 Message *m, Connection *con) override;
445 void send_message_osd_cluster(
446 Message *m, const ConnectionRef& con) override;
447 ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch) override;
448 entity_name_t get_cluster_msgr_name() override {
449 return osd->get_cluster_msgr_name();
450 }
451
452 PerfCounters *get_logger() override;
453
454 ceph_tid_t get_tid() override { return osd->get_tid(); }
455
456 LogClientTemp clog_error() override { return osd->clog->error(); }
457
458 struct watch_disconnect_t {
459 uint64_t cookie;
460 entity_name_t name;
461 bool send_disconnect;
462 watch_disconnect_t(uint64_t c, entity_name_t n, bool sd)
463 : cookie(c), name(n), send_disconnect(sd) {}
464 };
465 void complete_disconnect_watches(
466 ObjectContextRef obc,
467 const list<watch_disconnect_t> &to_disconnect);
468
469 /*
470 * Capture all object state associated with an in-progress read or write.
471 */
472 struct OpContext {
473 OpRequestRef op;
474 osd_reqid_t reqid;
475 vector<OSDOp> &ops;
476
477 const ObjectState *obs; // Old objectstate
478 const SnapSet *snapset; // Old snapset
479
480 ObjectState new_obs; // resulting ObjectState
481 SnapSet new_snapset; // resulting SnapSet (in case of a write)
482 //pg_stat_t new_stats; // resulting Stats
483 object_stat_sum_t delta_stats;
484
485 bool modify; // (force) modification (even if op_t is empty)
486 bool user_modify; // user-visible modification
487 bool undirty; // user explicitly un-dirtying this object
488 bool cache_evict; ///< true if this is a cache eviction
489 bool ignore_cache; ///< true if IGNORE_CACHE flag is set
490 bool ignore_log_op_stats; // don't log op stats
491 bool update_log_only; ///< this is a write that returned an error - just record in pg log for dup detection
492
493 // side effects
494 list<pair<watch_info_t,bool> > watch_connects; ///< new watch + will_ping flag
495 list<watch_disconnect_t> watch_disconnects; ///< old watch + send_discon
496 list<notify_info_t> notifies;
497 struct NotifyAck {
498 boost::optional<uint64_t> watch_cookie;
499 uint64_t notify_id;
500 bufferlist reply_bl;
501 explicit NotifyAck(uint64_t notify_id) : notify_id(notify_id) {}
502 NotifyAck(uint64_t notify_id, uint64_t cookie, bufferlist& rbl)
503 : watch_cookie(cookie), notify_id(notify_id) {
504 reply_bl.claim(rbl);
505 }
506 };
507 list<NotifyAck> notify_acks;
508
509 uint64_t bytes_written, bytes_read;
510
511 utime_t mtime;
512 SnapContext snapc; // writer snap context
513 eversion_t at_version; // pg's current version pointer
514 version_t user_at_version; // pg's current user version pointer
515
516 int current_osd_subop_num;
517
518 PGTransactionUPtr op_t;
519 vector<pg_log_entry_t> log;
520 boost::optional<pg_hit_set_history_t> updated_hset_history;
521
522 interval_set<uint64_t> modified_ranges;
523 ObjectContextRef obc;
524 ObjectContextRef clone_obc; // if we created a clone
525 ObjectContextRef snapset_obc; // if we created/deleted a snapdir
526
527 int data_off; // FIXME: we may want to kill this msgr hint off at some point!
528
529 MOSDOpReply *reply;
530
531 PrimaryLogPG *pg;
532
533 int num_read; ///< count read ops
534 int num_write; ///< count update ops
535
536 mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > extra_reqids;
537
538 CopyFromCallback *copy_cb;
539
540 hobject_t new_temp_oid, discard_temp_oid; ///< temp objects we should start/stop tracking
541
542 list<std::function<void()>> on_applied;
543 list<std::function<void()>> on_committed;
544 list<std::function<void()>> on_finish;
545 list<std::function<void()>> on_success;
546 template <typename F>
547 void register_on_finish(F &&f) {
548 on_finish.emplace_back(std::forward<F>(f));
549 }
550 template <typename F>
551 void register_on_success(F &&f) {
552 on_success.emplace_back(std::forward<F>(f));
553 }
554 template <typename F>
555 void register_on_applied(F &&f) {
556 on_applied.emplace_back(std::forward<F>(f));
557 }
558 template <typename F>
559 void register_on_commit(F &&f) {
560 on_committed.emplace_back(std::forward<F>(f));
561 }
562
563 bool sent_reply;
564
565 // pending async reads <off, len, op_flags> -> <outbl, outr>
566 list<pair<boost::tuple<uint64_t, uint64_t, unsigned>,
567 pair<bufferlist*, Context*> > > pending_async_reads;
568 int async_read_result;
569 int inflightreads;
570 friend struct OnReadComplete;
571 void start_async_reads(PrimaryLogPG *pg);
572 void finish_read(PrimaryLogPG *pg);
573 bool async_reads_complete() {
574 return inflightreads == 0;
575 }
576
577 ObjectContext::RWState::State lock_type;
578 ObcLockManager lock_manager;
579
580 OpContext(const OpContext& other);
581 const OpContext& operator=(const OpContext& other);
582
583 OpContext(OpRequestRef _op, osd_reqid_t _reqid, vector<OSDOp>& _ops,
584 ObjectContextRef& obc,
585 PrimaryLogPG *_pg) :
586 op(_op), reqid(_reqid), ops(_ops),
587 obs(&obc->obs),
588 snapset(0),
589 new_obs(obs->oi, obs->exists),
590 modify(false), user_modify(false), undirty(false), cache_evict(false),
591 ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
592 bytes_written(0), bytes_read(0), user_at_version(0),
593 current_osd_subop_num(0),
594 obc(obc),
595 data_off(0), reply(NULL), pg(_pg),
596 num_read(0),
597 num_write(0),
598 copy_cb(NULL),
599 sent_reply(false),
600 async_read_result(0),
601 inflightreads(0),
602 lock_type(ObjectContext::RWState::RWNONE) {
603 if (obc->ssc) {
604 new_snapset = obc->ssc->snapset;
605 snapset = &obc->ssc->snapset;
606 }
607 }
608 OpContext(OpRequestRef _op, osd_reqid_t _reqid,
609 vector<OSDOp>& _ops, PrimaryLogPG *_pg) :
610 op(_op), reqid(_reqid), ops(_ops), obs(NULL), snapset(0),
611 modify(false), user_modify(false), undirty(false), cache_evict(false),
612 ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
613 bytes_written(0), bytes_read(0), user_at_version(0),
614 current_osd_subop_num(0),
615 data_off(0), reply(NULL), pg(_pg),
616 num_read(0),
617 num_write(0),
618 copy_cb(NULL),
619 async_read_result(0),
620 inflightreads(0),
621 lock_type(ObjectContext::RWState::RWNONE) {}
622 void reset_obs(ObjectContextRef obc) {
623 new_obs = ObjectState(obc->obs.oi, obc->obs.exists);
624 if (obc->ssc) {
625 new_snapset = obc->ssc->snapset;
626 snapset = &obc->ssc->snapset;
627 }
628 }
629 ~OpContext() {
630 assert(!op_t);
631 if (reply)
632 reply->put();
633 for (list<pair<boost::tuple<uint64_t, uint64_t, unsigned>,
634 pair<bufferlist*, Context*> > >::iterator i =
635 pending_async_reads.begin();
636 i != pending_async_reads.end();
637 pending_async_reads.erase(i++)) {
638 delete i->second.second;
639 }
640 }
641 uint64_t get_features() {
642 if (op && op->get_req()) {
643 return op->get_req()->get_connection()->get_features();
644 }
645 return -1ull;
646 }
647 };
648 using OpContextUPtr = std::unique_ptr<OpContext>;
649 friend struct OpContext;
650
651 /*
652 * State on the PG primary associated with the replicated mutation
653 */
654 class RepGather {
655 public:
656 hobject_t hoid;
657 OpRequestRef op;
658 xlist<RepGather*>::item queue_item;
659 int nref;
660
661 eversion_t v;
662 int r = 0;
663
664 ceph_tid_t rep_tid;
665
666 bool rep_aborted, rep_done;
667
668 bool all_applied;
669 bool all_committed;
670 const bool applies_with_commit;
671
672 utime_t start;
673
674 eversion_t pg_local_last_complete;
675
676 ObcLockManager lock_manager;
677
678 list<std::function<void()>> on_applied;
679 list<std::function<void()>> on_committed;
680 list<std::function<void()>> on_success;
681 list<std::function<void()>> on_finish;
682
683 RepGather(
684 OpContext *c, ceph_tid_t rt,
685 eversion_t lc,
686 bool applies_with_commit) :
687 hoid(c->obc->obs.oi.soid),
688 op(c->op),
689 queue_item(this),
690 nref(1),
691 rep_tid(rt),
692 rep_aborted(false), rep_done(false),
693 all_applied(false), all_committed(false),
694 applies_with_commit(applies_with_commit),
695 pg_local_last_complete(lc),
696 lock_manager(std::move(c->lock_manager)),
697 on_applied(std::move(c->on_applied)),
698 on_committed(std::move(c->on_committed)),
699 on_success(std::move(c->on_success)),
700 on_finish(std::move(c->on_finish)) {}
701
702 RepGather(
703 ObcLockManager &&manager,
704 OpRequestRef &&o,
705 boost::optional<std::function<void(void)> > &&on_complete,
706 ceph_tid_t rt,
707 eversion_t lc,
708 bool applies_with_commit,
709 int r) :
710 op(o),
711 queue_item(this),
712 nref(1),
713 r(r),
714 rep_tid(rt),
715 rep_aborted(false), rep_done(false),
716 all_applied(false), all_committed(false),
717 applies_with_commit(applies_with_commit),
718 pg_local_last_complete(lc),
719 lock_manager(std::move(manager)) {
720 if (on_complete) {
721 on_success.push_back(std::move(*on_complete));
722 }
723 }
724
725 RepGather *get() {
726 nref++;
727 return this;
728 }
729 void put() {
730 assert(nref > 0);
731 if (--nref == 0) {
732 assert(on_applied.empty());
733 delete this;
734 //generic_dout(0) << "deleting " << this << dendl;
735 }
736 }
737 };
738
739
740 protected:
741
742 /**
743 * Grabs locks for OpContext, should be cleaned up in close_op_ctx
744 *
745 * @param ctx [in,out] ctx to get locks for
746 * @return true on success, false if we are queued
747 */
748 bool get_rw_locks(bool write_ordered, OpContext *ctx) {
749 /* If snapset_obc, !obc->obs->exists and we will always take the
750 * snapdir lock *before* the head lock. Since all callers will do
751 * this (read or write) if we get the first we will be guaranteed
752 * to get the second.
753 */
754 if (write_ordered && ctx->op->may_read()) {
755 ctx->lock_type = ObjectContext::RWState::RWEXCL;
756 } else if (write_ordered) {
757 ctx->lock_type = ObjectContext::RWState::RWWRITE;
758 } else {
759 assert(ctx->op->may_read());
760 ctx->lock_type = ObjectContext::RWState::RWREAD;
761 }
762
763 if (ctx->snapset_obc) {
764 assert(!ctx->obc->obs.exists);
765 if (!ctx->lock_manager.get_lock_type(
766 ctx->lock_type,
767 ctx->snapset_obc->obs.oi.soid,
768 ctx->snapset_obc,
769 ctx->op)) {
770 ctx->lock_type = ObjectContext::RWState::RWNONE;
771 return false;
772 }
773 }
774 if (ctx->lock_manager.get_lock_type(
775 ctx->lock_type,
776 ctx->obc->obs.oi.soid,
777 ctx->obc,
778 ctx->op)) {
779 return true;
780 } else {
781 assert(!ctx->snapset_obc);
782 ctx->lock_type = ObjectContext::RWState::RWNONE;
783 return false;
784 }
785 }
786
787 /**
788 * Cleans up OpContext
789 *
790 * @param ctx [in] ctx to clean up
791 */
792 void close_op_ctx(OpContext *ctx) {
793 release_object_locks(ctx->lock_manager);
794 ctx->op_t.reset();
795 for (auto p = ctx->on_finish.begin();
796 p != ctx->on_finish.end();
797 ctx->on_finish.erase(p++)) {
798 (*p)();
799 }
800 delete ctx;
801 }
802
803 /**
804 * Releases locks
805 *
806 * @param manager [in] manager with locks to release
807 */
808 void release_object_locks(
809 ObcLockManager &lock_manager) {
810 list<pair<hobject_t, list<OpRequestRef> > > to_req;
811 bool requeue_recovery = false;
812 bool requeue_snaptrim = false;
813 lock_manager.put_locks(
814 &to_req,
815 &requeue_recovery,
816 &requeue_snaptrim);
817 if (requeue_recovery)
818 queue_recovery();
819 if (requeue_snaptrim)
820 snap_trimmer_machine.process_event(TrimWriteUnblocked());
821
822 if (!to_req.empty()) {
823 // requeue at front of scrub blocking queue if we are blocked by scrub
824 for (auto &&p: to_req) {
825 if (scrubber.write_blocked_by_scrub(p.first.get_head())) {
826 waiting_for_scrub.splice(
827 waiting_for_scrub.begin(),
828 p.second,
829 p.second.begin(),
830 p.second.end());
831 } else {
832 requeue_ops(p.second);
833 }
834 }
835 }
836 }
837
838 // replica ops
839 // [primary|tail]
840 xlist<RepGather*> repop_queue;
841
842 friend class C_OSD_RepopApplied;
843 friend class C_OSD_RepopCommit;
844 void repop_all_applied(RepGather *repop);
845 void repop_all_committed(RepGather *repop);
846 void eval_repop(RepGather*);
847 void issue_repop(RepGather *repop, OpContext *ctx);
848 RepGather *new_repop(
849 OpContext *ctx,
850 ObjectContextRef obc,
851 ceph_tid_t rep_tid);
852 boost::intrusive_ptr<RepGather> new_repop(
853 eversion_t version,
854 int r,
855 ObcLockManager &&manager,
856 OpRequestRef &&op,
857 boost::optional<std::function<void(void)> > &&on_complete);
858 void remove_repop(RepGather *repop);
859
860 OpContextUPtr simple_opc_create(ObjectContextRef obc);
861 void simple_opc_submit(OpContextUPtr ctx);
862
863 /**
864 * Merge entries atomically into all actingbackfill osds
865 * adjusting missing and recovery state as necessary.
866 *
867 * Also used to store error log entries for dup detection.
868 */
869 void submit_log_entries(
870 const mempool::osd_pglog::list<pg_log_entry_t> &entries,
871 ObcLockManager &&manager,
872 boost::optional<std::function<void(void)> > &&on_complete,
873 OpRequestRef op = OpRequestRef(),
874 int r = 0);
875 struct LogUpdateCtx {
876 boost::intrusive_ptr<RepGather> repop;
877 set<pg_shard_t> waiting_on;
878 };
879 void cancel_log_updates();
880 map<ceph_tid_t, LogUpdateCtx> log_entry_update_waiting_on;
881
882
883 // hot/cold tracking
884 HitSetRef hit_set; ///< currently accumulating HitSet
885 utime_t hit_set_start_stamp; ///< time the current HitSet started recording
886
887
888 void hit_set_clear(); ///< discard any HitSet state
889 void hit_set_setup(); ///< initialize HitSet state
890 void hit_set_create(); ///< create a new HitSet
891 void hit_set_persist(); ///< persist hit info
892 bool hit_set_apply_log(); ///< apply log entries to update in-memory HitSet
893 void hit_set_trim(OpContextUPtr &ctx, unsigned max); ///< discard old HitSets
894 void hit_set_in_memory_trim(uint32_t max_in_memory); ///< discard old in memory HitSets
895 void hit_set_remove_all();
896
897 hobject_t get_hit_set_current_object(utime_t stamp);
898 hobject_t get_hit_set_archive_object(utime_t start,
899 utime_t end,
900 bool using_gmt);
901
902 // agent
903 boost::scoped_ptr<TierAgentState> agent_state;
904
905 void agent_setup(); ///< initialize agent state
906 bool agent_work(int max) override ///< entry point to do some agent work
907 {
908 return agent_work(max, max);
909 }
910 bool agent_work(int max, int agent_flush_quota) override;
911 bool agent_maybe_flush(ObjectContextRef& obc); ///< maybe flush
912 bool agent_maybe_evict(ObjectContextRef& obc, bool after_flush); ///< maybe evict
913
914 void agent_load_hit_sets(); ///< load HitSets, if needed
915
916 /// estimate object atime and temperature
917 ///
918 /// @param oid [in] object name
919 /// @param temperature [out] relative temperature (# consider both access time and frequency)
920 void agent_estimate_temp(const hobject_t& oid, int *temperature);
921
922 /// stop the agent
923 void agent_stop() override;
924 void agent_delay() override;
925
926 /// clear agent state
927 void agent_clear() override;
928
929 /// choose (new) agent mode(s), returns true if op is requeued
930 bool agent_choose_mode(bool restart = false, OpRequestRef op = OpRequestRef());
931 void agent_choose_mode_restart() override;
932
933 /// true if we can send an ondisk/commit for v
934 bool already_complete(eversion_t v);
935 /// true if we can send an ack for v
936 bool already_ack(eversion_t v);
937
938 // projected object info
939 SharedLRU<hobject_t, ObjectContext> object_contexts;
940 // map from oid.snapdir() to SnapSetContext *
941 map<hobject_t, SnapSetContext*> snapset_contexts;
942 Mutex snapset_contexts_lock;
943
944 // debug order that client ops are applied
945 map<hobject_t, map<client_t, ceph_tid_t>> debug_op_order;
946
947 void populate_obc_watchers(ObjectContextRef obc);
948 void check_blacklisted_obc_watchers(ObjectContextRef obc);
949 void check_blacklisted_watchers() override;
950 void get_watchers(list<obj_watch_item_t> &pg_watchers) override;
951 void get_obc_watchers(ObjectContextRef obc, list<obj_watch_item_t> &pg_watchers);
952 public:
953 void handle_watch_timeout(WatchRef watch);
954 protected:
955
956 ObjectContextRef create_object_context(const object_info_t& oi, SnapSetContext *ssc);
957 ObjectContextRef get_object_context(
958 const hobject_t& soid,
959 bool can_create,
960 const map<string, bufferlist> *attrs = 0
961 );
962
963 void context_registry_on_change();
964 void object_context_destructor_callback(ObjectContext *obc);
965 class C_PG_ObjectContext;
966
967 int find_object_context(const hobject_t& oid,
968 ObjectContextRef *pobc,
969 bool can_create,
970 bool map_snapid_to_clone=false,
971 hobject_t *missing_oid=NULL);
972
973 void add_object_context_to_pg_stat(ObjectContextRef obc, pg_stat_t *stat);
974
975 void get_src_oloc(const object_t& oid, const object_locator_t& oloc, object_locator_t& src_oloc);
976
977 SnapSetContext *get_snapset_context(
978 const hobject_t& oid,
979 bool can_create,
980 const map<string, bufferlist> *attrs = 0,
981 bool oid_existed = true //indicate this oid whether exsited in backend
982 );
983 void register_snapset_context(SnapSetContext *ssc) {
984 Mutex::Locker l(snapset_contexts_lock);
985 _register_snapset_context(ssc);
986 }
987 void _register_snapset_context(SnapSetContext *ssc) {
988 assert(snapset_contexts_lock.is_locked());
989 if (!ssc->registered) {
990 assert(snapset_contexts.count(ssc->oid) == 0);
991 ssc->registered = true;
992 snapset_contexts[ssc->oid] = ssc;
993 }
994 }
995 void put_snapset_context(SnapSetContext *ssc);
996
997 map<hobject_t, ObjectContextRef> recovering;
998
999 /*
1000 * Backfill
1001 *
1002 * peer_info[backfill_target].last_backfill == info.last_backfill on the peer.
1003 *
1004 * objects prior to peer_info[backfill_target].last_backfill
1005 * - are on the peer
1006 * - are included in the peer stats
1007 *
1008 * objects \in (last_backfill, last_backfill_started]
1009 * - are on the peer or are in backfills_in_flight
1010 * - are not included in pg stats (yet)
1011 * - have their stats in pending_backfill_updates on the primary
1012 */
1013 set<hobject_t> backfills_in_flight;
1014 map<hobject_t, pg_stat_t> pending_backfill_updates;
1015
1016 void dump_recovery_info(Formatter *f) const override {
1017 f->open_array_section("backfill_targets");
1018 for (set<pg_shard_t>::const_iterator p = backfill_targets.begin();
1019 p != backfill_targets.end(); ++p)
1020 f->dump_stream("replica") << *p;
1021 f->close_section();
1022 f->open_array_section("waiting_on_backfill");
1023 for (set<pg_shard_t>::const_iterator p = waiting_on_backfill.begin();
1024 p != waiting_on_backfill.end(); ++p)
1025 f->dump_stream("osd") << *p;
1026 f->close_section();
1027 f->dump_stream("last_backfill_started") << last_backfill_started;
1028 {
1029 f->open_object_section("backfill_info");
1030 backfill_info.dump(f);
1031 f->close_section();
1032 }
1033 {
1034 f->open_array_section("peer_backfill_info");
1035 for (map<pg_shard_t, BackfillInterval>::const_iterator pbi =
1036 peer_backfill_info.begin();
1037 pbi != peer_backfill_info.end(); ++pbi) {
1038 f->dump_stream("osd") << pbi->first;
1039 f->open_object_section("BackfillInterval");
1040 pbi->second.dump(f);
1041 f->close_section();
1042 }
1043 f->close_section();
1044 }
1045 {
1046 f->open_array_section("backfills_in_flight");
1047 for (set<hobject_t>::const_iterator i = backfills_in_flight.begin();
1048 i != backfills_in_flight.end();
1049 ++i) {
1050 f->dump_stream("object") << *i;
1051 }
1052 f->close_section();
1053 }
1054 {
1055 f->open_array_section("recovering");
1056 for (map<hobject_t, ObjectContextRef>::const_iterator i = recovering.begin();
1057 i != recovering.end();
1058 ++i) {
1059 f->dump_stream("object") << i->first;
1060 }
1061 f->close_section();
1062 }
1063 {
1064 f->open_object_section("pg_backend");
1065 pgbackend->dump_recovery_info(f);
1066 f->close_section();
1067 }
1068 }
1069
1070 /// last backfill operation started
1071 hobject_t last_backfill_started;
1072 bool new_backfill;
1073
1074 int prep_object_replica_pushes(const hobject_t& soid, eversion_t v,
1075 PGBackend::RecoveryHandle *h);
1076
1077 void finish_degraded_object(const hobject_t& oid);
1078
1079 // Cancels/resets pulls from peer
1080 void check_recovery_sources(const OSDMapRef& map) override ;
1081
1082 int recover_missing(
1083 const hobject_t& oid,
1084 eversion_t v,
1085 int priority,
1086 PGBackend::RecoveryHandle *h);
1087
1088 // low level ops
1089
1090 void _make_clone(
1091 OpContext *ctx,
1092 PGTransaction* t,
1093 ObjectContextRef obc,
1094 const hobject_t& head, const hobject_t& coid,
1095 object_info_t *poi);
1096 void execute_ctx(OpContext *ctx);
1097 void finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc=true);
1098 void reply_ctx(OpContext *ctx, int err);
1099 void reply_ctx(OpContext *ctx, int err, eversion_t v, version_t uv);
1100 void make_writeable(OpContext *ctx);
1101 void log_op_stats(OpContext *ctx);
1102
1103 void write_update_size_and_usage(object_stat_sum_t& stats, object_info_t& oi,
1104 interval_set<uint64_t>& modified, uint64_t offset,
1105 uint64_t length, bool write_full=false);
1106 void add_interval_usage(interval_set<uint64_t>& s, object_stat_sum_t& st);
1107
1108
1109 enum class cache_result_t {
1110 NOOP,
1111 BLOCKED_FULL,
1112 BLOCKED_PROMOTE,
1113 HANDLED_PROXY,
1114 HANDLED_REDIRECT,
1115 };
1116 cache_result_t maybe_handle_cache_detail(OpRequestRef op,
1117 bool write_ordered,
1118 ObjectContextRef obc, int r,
1119 hobject_t missing_oid,
1120 bool must_promote,
1121 bool in_hit_set,
1122 ObjectContextRef *promote_obc);
1123 cache_result_t maybe_handle_manifest_detail(OpRequestRef op,
1124 bool write_ordered,
1125 ObjectContextRef obc);
1126 bool maybe_handle_manifest(OpRequestRef op,
1127 bool write_ordered,
1128 ObjectContextRef obc) {
1129 return cache_result_t::NOOP != maybe_handle_manifest_detail(
1130 op,
1131 write_ordered,
1132 obc);
1133 }
1134
1135 /**
1136 * This helper function is called from do_op if the ObjectContext lookup fails.
1137 * @returns true if the caching code is handling the Op, false otherwise.
1138 */
1139 bool maybe_handle_cache(OpRequestRef op,
1140 bool write_ordered,
1141 ObjectContextRef obc, int r,
1142 const hobject_t& missing_oid,
1143 bool must_promote,
1144 bool in_hit_set = false) {
1145 return cache_result_t::NOOP != maybe_handle_cache_detail(
1146 op,
1147 write_ordered,
1148 obc,
1149 r,
1150 missing_oid,
1151 must_promote,
1152 in_hit_set,
1153 nullptr);
1154 }
1155
1156 /**
1157 * This helper function checks if a promotion is needed.
1158 */
1159 bool maybe_promote(ObjectContextRef obc,
1160 const hobject_t& missing_oid,
1161 const object_locator_t& oloc,
1162 bool in_hit_set,
1163 uint32_t recency,
1164 OpRequestRef promote_op,
1165 ObjectContextRef *promote_obc = nullptr);
1166 /**
1167 * This helper function tells the client to redirect their request elsewhere.
1168 */
1169 void do_cache_redirect(OpRequestRef op);
1170 /**
1171 * This function attempts to start a promote. Either it succeeds,
1172 * or places op on a wait list. If op is null, failure means that
1173 * this is a noop. If a future user wants to be able to distinguish
1174 * these cases, a return value should be added.
1175 */
1176 void promote_object(
1177 ObjectContextRef obc, ///< [optional] obc
1178 const hobject_t& missing_object, ///< oid (if !obc)
1179 const object_locator_t& oloc, ///< locator for obc|oid
1180 OpRequestRef op, ///< [optional] client op
1181 ObjectContextRef *promote_obc = nullptr ///< [optional] new obc for object
1182 );
1183
1184 int prepare_transaction(OpContext *ctx);
1185 list<pair<OpRequestRef, OpContext*> > in_progress_async_reads;
1186 void complete_read_ctx(int result, OpContext *ctx);
1187
1188 // pg on-disk content
1189 void check_local() override;
1190
1191 void _clear_recovery_state() override;
1192
1193 bool start_recovery_ops(
1194 uint64_t max,
1195 ThreadPool::TPHandle &handle, uint64_t *started) override;
1196
1197 uint64_t recover_primary(uint64_t max, ThreadPool::TPHandle &handle);
1198 uint64_t recover_replicas(uint64_t max, ThreadPool::TPHandle &handle);
1199 hobject_t earliest_peer_backfill() const;
1200 bool all_peer_done() const;
1201 /**
1202 * @param work_started will be set to true if recover_backfill got anywhere
1203 * @returns the number of operations started
1204 */
1205 uint64_t recover_backfill(uint64_t max, ThreadPool::TPHandle &handle,
1206 bool *work_started);
1207
1208 /**
1209 * scan a (hash) range of objects in the current pg
1210 *
1211 * @begin first item should be >= this value
1212 * @min return at least this many items, unless we are done
1213 * @max return no more than this many items
1214 * @bi [out] resulting map of objects to eversion_t's
1215 */
1216 void scan_range(
1217 int min, int max, BackfillInterval *bi,
1218 ThreadPool::TPHandle &handle
1219 );
1220
1221 /// Update a hash range to reflect changes since the last scan
1222 void update_range(
1223 BackfillInterval *bi, ///< [in,out] interval to update
1224 ThreadPool::TPHandle &handle ///< [in] tp handle
1225 );
1226
1227 int prep_backfill_object_push(
1228 hobject_t oid, eversion_t v, ObjectContextRef obc,
1229 vector<pg_shard_t> peers,
1230 PGBackend::RecoveryHandle *h);
1231 void send_remove_op(const hobject_t& oid, eversion_t v, pg_shard_t peer);
1232
1233
1234 class C_OSD_OndiskWriteUnlock;
1235 class C_OSD_AppliedRecoveredObject;
1236 class C_OSD_CommittedPushedObject;
1237 class C_OSD_AppliedRecoveredObjectReplica;
1238 void sub_op_remove(OpRequestRef op);
1239
1240 void _applied_recovered_object(ObjectContextRef obc);
1241 void _applied_recovered_object_replica();
1242 void _committed_pushed_object(epoch_t epoch, eversion_t lc);
1243 void recover_got(hobject_t oid, eversion_t v);
1244
1245 // -- copyfrom --
1246 map<hobject_t, CopyOpRef> copy_ops;
1247
1248 int fill_in_copy_get(
1249 OpContext *ctx,
1250 bufferlist::iterator& bp,
1251 OSDOp& op,
1252 ObjectContextRef& obc);
1253 void fill_in_copy_get_noent(OpRequestRef& op, hobject_t oid,
1254 OSDOp& osd_op);
1255
1256 /**
1257 * To copy an object, call start_copy.
1258 *
1259 * @param cb: The CopyCallback to be activated when the copy is complete
1260 * @param obc: The ObjectContext we are copying into
1261 * @param src: The source object
1262 * @param oloc: the source object locator
1263 * @param version: the version of the source object to copy (0 for any)
1264 */
1265 void start_copy(CopyCallback *cb, ObjectContextRef obc, hobject_t src,
1266 object_locator_t oloc, version_t version, unsigned flags,
1267 bool mirror_snapset, unsigned src_obj_fadvise_flags,
1268 unsigned dest_obj_fadvise_flags);
1269 void process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r);
1270 void _write_copy_chunk(CopyOpRef cop, PGTransaction *t);
1271 uint64_t get_copy_chunk_size() const {
1272 uint64_t size = cct->_conf->osd_copyfrom_max_chunk;
1273 if (pool.info.requires_aligned_append()) {
1274 uint64_t alignment = pool.info.required_alignment();
1275 if (size % alignment) {
1276 size += alignment - (size % alignment);
1277 }
1278 }
1279 return size;
1280 }
1281 void _copy_some(ObjectContextRef obc, CopyOpRef cop);
1282 void finish_copyfrom(OpContext *ctx);
1283 void finish_promote(int r, CopyResults *results, ObjectContextRef obc);
1284 void cancel_copy(CopyOpRef cop, bool requeue);
1285 void cancel_copy_ops(bool requeue);
1286
1287 friend struct C_Copyfrom;
1288
1289 // -- flush --
1290 map<hobject_t, FlushOpRef> flush_ops;
1291
1292 /// start_flush takes ownership of on_flush iff ret == -EINPROGRESS
1293 int start_flush(
1294 OpRequestRef op, ObjectContextRef obc,
1295 bool blocking, hobject_t *pmissing,
1296 boost::optional<std::function<void()>> &&on_flush);
1297 void finish_flush(hobject_t oid, ceph_tid_t tid, int r);
1298 int try_flush_mark_clean(FlushOpRef fop);
1299 void cancel_flush(FlushOpRef fop, bool requeue);
1300 void cancel_flush_ops(bool requeue);
1301
1302 /// @return false if clone is has been evicted
1303 bool is_present_clone(hobject_t coid);
1304
1305 friend struct C_Flush;
1306
1307 // -- scrub --
1308 bool _range_available_for_scrub(
1309 const hobject_t &begin, const hobject_t &end) override;
1310 void scrub_snapshot_metadata(
1311 ScrubMap &map,
1312 const std::map<hobject_t, pair<uint32_t, uint32_t>> &missing_digest) override;
1313 void _scrub_clear_state() override;
1314 void _scrub_finish() override;
1315 object_stat_collection_t scrub_cstat;
1316
1317 void _split_into(pg_t child_pgid, PG *child,
1318 unsigned split_bits) override;
1319 void apply_and_flush_repops(bool requeue);
1320
1321 void calc_trim_to() override;
1322 int do_xattr_cmp_u64(int op, __u64 v1, bufferlist& xattr);
1323 int do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr);
1324
1325 // -- checksum --
1326 int do_checksum(OpContext *ctx, OSDOp& osd_op, bufferlist::iterator *bl_it,
1327 bool *async_read);
1328 int finish_checksum(OSDOp& osd_op, Checksummer::CSumType csum_type,
1329 bufferlist::iterator *init_value_bl_it,
1330 const bufferlist &read_bl);
1331
1332 friend class C_ChecksumRead;
1333
1334 int do_extent_cmp(OpContext *ctx, OSDOp& osd_op);
1335 int do_writesame(OpContext *ctx, OSDOp& osd_op);
1336
1337 bool pgls_filter(PGLSFilter *filter, hobject_t& sobj, bufferlist& outdata);
1338 int get_pgls_filter(bufferlist::iterator& iter, PGLSFilter **pfilter);
1339
1340 map<hobject_t, list<OpRequestRef>> in_progress_proxy_ops;
1341 void kick_proxy_ops_blocked(hobject_t& soid);
1342 void cancel_proxy_ops(bool requeue);
1343
1344 // -- proxyread --
1345 map<ceph_tid_t, ProxyReadOpRef> proxyread_ops;
1346
1347 void do_proxy_read(OpRequestRef op, ObjectContextRef obc = NULL);
1348 void finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r);
1349 void cancel_proxy_read(ProxyReadOpRef prdop);
1350
1351 friend struct C_ProxyRead;
1352
1353 // -- proxywrite --
1354 map<ceph_tid_t, ProxyWriteOpRef> proxywrite_ops;
1355
1356 void do_proxy_write(OpRequestRef op, const hobject_t& missing_oid, ObjectContextRef obc = NULL);
1357 void finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r);
1358 void cancel_proxy_write(ProxyWriteOpRef pwop);
1359
1360 friend struct C_ProxyWrite_Commit;
1361
1362 public:
1363 PrimaryLogPG(OSDService *o, OSDMapRef curmap,
1364 const PGPool &_pool, spg_t p);
1365 ~PrimaryLogPG() override {}
1366
1367 int do_command(
1368 cmdmap_t cmdmap,
1369 ostream& ss,
1370 bufferlist& idata,
1371 bufferlist& odata,
1372 ConnectionRef conn,
1373 ceph_tid_t tid) override;
1374
1375 void do_request(
1376 OpRequestRef& op,
1377 ThreadPool::TPHandle &handle) override;
1378 void do_op(OpRequestRef& op) override;
1379 void record_write_error(OpRequestRef op, const hobject_t &soid,
1380 MOSDOpReply *orig_reply, int r);
1381 void do_pg_op(OpRequestRef op);
1382 void do_sub_op(OpRequestRef op) override;
1383 void do_sub_op_reply(OpRequestRef op) override;
1384 void do_scan(
1385 OpRequestRef op,
1386 ThreadPool::TPHandle &handle) override;
1387 void do_backfill(OpRequestRef op) override;
1388 void do_backfill_remove(OpRequestRef op);
1389
1390 void handle_backoff(OpRequestRef& op);
1391
1392 int trim_object(bool first, const hobject_t &coid, OpContextUPtr *ctxp);
1393 void snap_trimmer(epoch_t e) override;
1394 void kick_snap_trim() override;
1395 void snap_trimmer_scrub_complete() override;
1396 int do_osd_ops(OpContext *ctx, vector<OSDOp>& ops);
1397
1398 int _get_tmap(OpContext *ctx, bufferlist *header, bufferlist *vals);
1399 int do_tmap2omap(OpContext *ctx, unsigned flags);
1400 int do_tmapup(OpContext *ctx, bufferlist::iterator& bp, OSDOp& osd_op);
1401 int do_tmapup_slow(OpContext *ctx, bufferlist::iterator& bp, OSDOp& osd_op, bufferlist& bl);
1402
1403 void do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn);
1404 private:
1405 int do_scrub_ls(MOSDOp *op, OSDOp *osd_op);
1406 hobject_t earliest_backfill() const;
1407 bool check_src_targ(const hobject_t& soid, const hobject_t& toid) const;
1408
1409 uint64_t temp_seq; ///< last id for naming temp objects
1410 /// generate a new temp object name
1411 hobject_t generate_temp_object(const hobject_t& target);
1412 /// generate a new temp object name (for recovery)
1413 hobject_t get_temp_recovery_object(const hobject_t& target,
1414 eversion_t version) override;
1415 int get_recovery_op_priority() const {
1416 int pri = 0;
1417 pool.info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
1418 return pri > 0 ? pri : cct->_conf->osd_recovery_op_priority;
1419 }
1420 void log_missing(unsigned missing,
1421 const boost::optional<hobject_t> &head,
1422 LogChannelRef clog,
1423 const spg_t &pgid,
1424 const char *func,
1425 const char *mode,
1426 bool allow_incomplete_clones);
1427 unsigned process_clones_to(const boost::optional<hobject_t> &head,
1428 const boost::optional<SnapSet> &snapset,
1429 LogChannelRef clog,
1430 const spg_t &pgid,
1431 const char *mode,
1432 bool allow_incomplete_clones,
1433 boost::optional<snapid_t> target,
1434 vector<snapid_t>::reverse_iterator *curclone,
1435 inconsistent_snapset_wrapper &snap_error);
1436
1437 public:
1438 coll_t get_coll() {
1439 return coll;
1440 }
1441 void split_colls(
1442 spg_t child,
1443 int split_bits,
1444 int seed,
1445 const pg_pool_t *pool,
1446 ObjectStore::Transaction *t) override {
1447 coll_t target = coll_t(child);
1448 PG::_create(*t, child, split_bits);
1449 t->split_collection(
1450 coll,
1451 split_bits,
1452 seed,
1453 target);
1454 PG::_init(*t, child, pool);
1455 }
1456 private:
1457
1458 struct DoSnapWork : boost::statechart::event< DoSnapWork > {
1459 DoSnapWork() : boost::statechart::event < DoSnapWork >() {}
1460 };
1461 struct KickTrim : boost::statechart::event< KickTrim > {
1462 KickTrim() : boost::statechart::event < KickTrim >() {}
1463 };
1464 struct RepopsComplete : boost::statechart::event< RepopsComplete > {
1465 RepopsComplete() : boost::statechart::event < RepopsComplete >() {}
1466 };
1467 struct ScrubComplete : boost::statechart::event< ScrubComplete > {
1468 ScrubComplete() : boost::statechart::event < ScrubComplete >() {}
1469 };
1470 struct TrimWriteUnblocked : boost::statechart::event< TrimWriteUnblocked > {
1471 TrimWriteUnblocked() : boost::statechart::event < TrimWriteUnblocked >() {}
1472 };
1473 struct Reset : boost::statechart::event< Reset > {
1474 Reset() : boost::statechart::event< Reset >() {}
1475 };
1476 struct SnapTrimReserved : boost::statechart::event< SnapTrimReserved > {
1477 SnapTrimReserved() : boost::statechart::event< SnapTrimReserved >() {}
1478 };
1479 struct SnapTrimTimerReady : boost::statechart::event< SnapTrimTimerReady > {
1480 SnapTrimTimerReady() : boost::statechart::event< SnapTrimTimerReady >() {}
1481 };
1482
1483 struct NotTrimming;
1484 struct SnapTrimmer : public boost::statechart::state_machine< SnapTrimmer, NotTrimming > {
1485 PrimaryLogPG *pg;
1486 explicit SnapTrimmer(PrimaryLogPG *pg) : pg(pg) {}
1487 void log_enter(const char *state_name);
1488 void log_exit(const char *state_name, utime_t duration);
1489 bool can_trim() {
1490 return pg->is_clean() && !pg->scrubber.active && !pg->snap_trimq.empty();
1491 }
1492 } snap_trimmer_machine;
1493
1494 struct WaitReservation;
1495 struct Trimming : boost::statechart::state< Trimming, SnapTrimmer, WaitReservation >, NamedState {
1496 typedef boost::mpl::list <
1497 boost::statechart::custom_reaction< KickTrim >,
1498 boost::statechart::transition< Reset, NotTrimming >
1499 > reactions;
1500
1501 set<hobject_t> in_flight;
1502 snapid_t snap_to_trim;
1503
1504 explicit Trimming(my_context ctx)
1505 : my_base(ctx),
1506 NamedState(context< SnapTrimmer >().pg, "Trimming") {
1507 context< SnapTrimmer >().log_enter(state_name);
1508 assert(context< SnapTrimmer >().can_trim());
1509 assert(in_flight.empty());
1510 }
1511 void exit() {
1512 context< SnapTrimmer >().log_exit(state_name, enter_time);
1513 auto *pg = context< SnapTrimmer >().pg;
1514 pg->osd->snap_reserver.cancel_reservation(pg->get_pgid());
1515 pg->state_clear(PG_STATE_SNAPTRIM);
1516 pg->publish_stats_to_osd();
1517 }
1518 boost::statechart::result react(const KickTrim&) {
1519 return discard_event();
1520 }
1521 };
1522
1523 /* SnapTrimmerStates */
1524 struct WaitTrimTimer : boost::statechart::state< WaitTrimTimer, Trimming >, NamedState {
1525 typedef boost::mpl::list <
1526 boost::statechart::custom_reaction< SnapTrimTimerReady >
1527 > reactions;
1528 Context *wakeup = nullptr;
1529 explicit WaitTrimTimer(my_context ctx)
1530 : my_base(ctx),
1531 NamedState(context< SnapTrimmer >().pg, "Trimming/WaitTrimTimer") {
1532 context< SnapTrimmer >().log_enter(state_name);
1533 assert(context<Trimming>().in_flight.empty());
1534 struct OnTimer : Context {
1535 PrimaryLogPGRef pg;
1536 epoch_t epoch;
1537 OnTimer(PrimaryLogPGRef pg, epoch_t epoch) : pg(pg), epoch(epoch) {}
1538 void finish(int) override {
1539 pg->lock();
1540 if (!pg->pg_has_reset_since(epoch))
1541 pg->snap_trimmer_machine.process_event(SnapTrimTimerReady());
1542 pg->unlock();
1543 }
1544 };
1545 auto *pg = context< SnapTrimmer >().pg;
1546 if (pg->cct->_conf->osd_snap_trim_sleep > 0) {
1547 wakeup = new OnTimer{pg, pg->get_osdmap()->get_epoch()};
1548 Mutex::Locker l(pg->osd->snap_sleep_lock);
1549 pg->osd->snap_sleep_timer.add_event_after(
1550 pg->cct->_conf->osd_snap_trim_sleep, wakeup);
1551 } else {
1552 post_event(SnapTrimTimerReady());
1553 }
1554 }
1555 void exit() {
1556 context< SnapTrimmer >().log_exit(state_name, enter_time);
1557 auto *pg = context< SnapTrimmer >().pg;
1558 if (wakeup) {
1559 Mutex::Locker l(pg->osd->snap_sleep_lock);
1560 pg->osd->snap_sleep_timer.cancel_event(wakeup);
1561 wakeup = nullptr;
1562 }
1563 }
1564 boost::statechart::result react(const SnapTrimTimerReady &) {
1565 wakeup = nullptr;
1566 if (!context< SnapTrimmer >().can_trim()) {
1567 post_event(KickTrim());
1568 return transit< NotTrimming >();
1569 } else {
1570 return transit< AwaitAsyncWork >();
1571 }
1572 }
1573 };
1574
1575 struct WaitRWLock : boost::statechart::state< WaitRWLock, Trimming >, NamedState {
1576 typedef boost::mpl::list <
1577 boost::statechart::custom_reaction< TrimWriteUnblocked >
1578 > reactions;
1579 explicit WaitRWLock(my_context ctx)
1580 : my_base(ctx),
1581 NamedState(context< SnapTrimmer >().pg, "Trimming/WaitRWLock") {
1582 context< SnapTrimmer >().log_enter(state_name);
1583 assert(context<Trimming>().in_flight.empty());
1584 }
1585 void exit() {
1586 context< SnapTrimmer >().log_exit(state_name, enter_time);
1587 }
1588 boost::statechart::result react(const TrimWriteUnblocked&) {
1589 if (!context< SnapTrimmer >().can_trim()) {
1590 post_event(KickTrim());
1591 return transit< NotTrimming >();
1592 } else {
1593 return transit< AwaitAsyncWork >();
1594 }
1595 }
1596 };
1597
1598 struct WaitRepops : boost::statechart::state< WaitRepops, Trimming >, NamedState {
1599 typedef boost::mpl::list <
1600 boost::statechart::custom_reaction< RepopsComplete >
1601 > reactions;
1602 explicit WaitRepops(my_context ctx)
1603 : my_base(ctx),
1604 NamedState(context< SnapTrimmer >().pg, "Trimming/WaitRepops") {
1605 context< SnapTrimmer >().log_enter(state_name);
1606 assert(!context<Trimming>().in_flight.empty());
1607 }
1608 void exit() {
1609 context< SnapTrimmer >().log_exit(state_name, enter_time);
1610 }
1611 boost::statechart::result react(const RepopsComplete&) {
1612 if (!context< SnapTrimmer >().can_trim()) {
1613 post_event(KickTrim());
1614 return transit< NotTrimming >();
1615 } else {
1616 return transit< WaitTrimTimer >();
1617 }
1618 }
1619 };
1620
1621 struct AwaitAsyncWork : boost::statechart::state< AwaitAsyncWork, Trimming >, NamedState {
1622 typedef boost::mpl::list <
1623 boost::statechart::custom_reaction< DoSnapWork >
1624 > reactions;
1625 explicit AwaitAsyncWork(my_context ctx);
1626 void exit() {
1627 context< SnapTrimmer >().log_exit(state_name, enter_time);
1628 }
1629 boost::statechart::result react(const DoSnapWork&);
1630 };
1631
1632 struct WaitReservation : boost::statechart::state< WaitReservation, Trimming >, NamedState {
1633 /* WaitReservation is a sub-state of trimming simply so that exiting Trimming
1634 * always cancels the reservation */
1635 typedef boost::mpl::list <
1636 boost::statechart::custom_reaction< SnapTrimReserved >
1637 > reactions;
1638 struct ReservationCB : public Context {
1639 PrimaryLogPGRef pg;
1640 bool canceled;
1641 ReservationCB(PrimaryLogPG *pg) : pg(pg), canceled(false) {}
1642 void finish(int) override {
1643 pg->lock();
1644 if (!canceled)
1645 pg->snap_trimmer_machine.process_event(SnapTrimReserved());
1646 pg->unlock();
1647 }
1648 void cancel() {
1649 assert(pg->is_locked());
1650 assert(!canceled);
1651 canceled = true;
1652 }
1653 };
1654 ReservationCB *pending = nullptr;
1655
1656 explicit WaitReservation(my_context ctx)
1657 : my_base(ctx),
1658 NamedState(context< SnapTrimmer >().pg, "Trimming/WaitReservation") {
1659 context< SnapTrimmer >().log_enter(state_name);
1660 assert(context<Trimming>().in_flight.empty());
1661 auto *pg = context< SnapTrimmer >().pg;
1662 pending = new ReservationCB(pg);
1663 pg->osd->snap_reserver.request_reservation(
1664 pg->get_pgid(),
1665 pending,
1666 0);
1667 pg->state_set(PG_STATE_SNAPTRIM_WAIT);
1668 pg->publish_stats_to_osd();
1669 }
1670 boost::statechart::result react(const SnapTrimReserved&);
1671 void exit() {
1672 context< SnapTrimmer >().log_exit(state_name, enter_time);
1673 if (pending)
1674 pending->cancel();
1675 pending = nullptr;
1676 auto *pg = context< SnapTrimmer >().pg;
1677 pg->state_clear(PG_STATE_SNAPTRIM_WAIT);
1678 pg->state_clear(PG_STATE_SNAPTRIM_ERROR);
1679 pg->publish_stats_to_osd();
1680 }
1681 };
1682
1683 struct WaitScrub : boost::statechart::state< WaitScrub, SnapTrimmer >, NamedState {
1684 typedef boost::mpl::list <
1685 boost::statechart::custom_reaction< ScrubComplete >,
1686 boost::statechart::custom_reaction< KickTrim >,
1687 boost::statechart::transition< Reset, NotTrimming >
1688 > reactions;
1689 explicit WaitScrub(my_context ctx)
1690 : my_base(ctx),
1691 NamedState(context< SnapTrimmer >().pg, "Trimming/WaitScrub") {
1692 context< SnapTrimmer >().log_enter(state_name);
1693 }
1694 void exit() {
1695 context< SnapTrimmer >().log_exit(state_name, enter_time);
1696 }
1697 boost::statechart::result react(const ScrubComplete&) {
1698 post_event(KickTrim());
1699 return transit< NotTrimming >();
1700 }
1701 boost::statechart::result react(const KickTrim&) {
1702 return discard_event();
1703 }
1704 };
1705
1706 struct NotTrimming : boost::statechart::state< NotTrimming, SnapTrimmer >, NamedState {
1707 typedef boost::mpl::list <
1708 boost::statechart::custom_reaction< KickTrim >,
1709 boost::statechart::transition< Reset, NotTrimming >
1710 > reactions;
1711 explicit NotTrimming(my_context ctx);
1712 void exit();
1713 boost::statechart::result react(const KickTrim&);
1714 };
1715
1716 int _verify_no_head_clones(const hobject_t& soid,
1717 const SnapSet& ss);
1718 // return true if we're creating a local object, false for a
1719 // whiteout or no change.
1720 void maybe_create_new_object(OpContext *ctx, bool ignore_transaction=false);
1721 int _delete_oid(OpContext *ctx, bool no_whiteout, bool try_no_whiteout);
1722 int _rollback_to(OpContext *ctx, ceph_osd_op& op);
1723 public:
1724 bool is_missing_object(const hobject_t& oid) const;
1725 bool is_unreadable_object(const hobject_t &oid) const {
1726 return is_missing_object(oid) ||
1727 !missing_loc.readable_with_acting(oid, actingset);
1728 }
1729 void maybe_kick_recovery(const hobject_t &soid);
1730 void wait_for_unreadable_object(const hobject_t& oid, OpRequestRef op);
1731 void wait_for_all_missing(OpRequestRef op);
1732
1733 bool is_degraded_or_backfilling_object(const hobject_t& oid);
1734 void wait_for_degraded_object(const hobject_t& oid, OpRequestRef op);
1735
1736 void block_write_on_full_cache(
1737 const hobject_t& oid, OpRequestRef op);
1738 void block_for_clean(
1739 const hobject_t& oid, OpRequestRef op);
1740 void block_write_on_snap_rollback(
1741 const hobject_t& oid, ObjectContextRef obc, OpRequestRef op);
1742 void block_write_on_degraded_snap(const hobject_t& oid, OpRequestRef op);
1743
1744 bool maybe_await_blocked_snapset(const hobject_t &soid, OpRequestRef op);
1745 void wait_for_blocked_object(const hobject_t& soid, OpRequestRef op);
1746 void kick_object_context_blocked(ObjectContextRef obc);
1747
1748 void maybe_force_recovery();
1749
1750 void mark_all_unfound_lost(
1751 int what,
1752 ConnectionRef con,
1753 ceph_tid_t tid);
1754 eversion_t pick_newest_available(const hobject_t& oid);
1755
1756 void do_update_log_missing(
1757 OpRequestRef &op);
1758
1759 void do_update_log_missing_reply(
1760 OpRequestRef &op);
1761
1762 void on_role_change() override;
1763 void on_pool_change() override;
1764 void _on_new_interval() override;
1765 void on_change(ObjectStore::Transaction *t) override;
1766 void on_activate() override;
1767 void on_flushed() override;
1768 void on_removal(ObjectStore::Transaction *t) override;
1769 void on_shutdown() override;
1770 bool check_failsafe_full(ostream &ss) override;
1771 bool check_osdmap_full(const set<pg_shard_t> &missing_on) override;
1772 int rep_repair_primary_object(const hobject_t& soid, OpRequestRef op);
1773
1774 // attr cache handling
1775 void setattr_maybe_cache(
1776 ObjectContextRef obc,
1777 OpContext *op,
1778 PGTransaction *t,
1779 const string &key,
1780 bufferlist &val);
1781 void setattrs_maybe_cache(
1782 ObjectContextRef obc,
1783 OpContext *op,
1784 PGTransaction *t,
1785 map<string, bufferlist> &attrs);
1786 void rmattr_maybe_cache(
1787 ObjectContextRef obc,
1788 OpContext *op,
1789 PGTransaction *t,
1790 const string &key);
1791 int getattr_maybe_cache(
1792 ObjectContextRef obc,
1793 const string &key,
1794 bufferlist *val);
1795 int getattrs_maybe_cache(
1796 ObjectContextRef obc,
1797 map<string, bufferlist> *out,
1798 bool user_only = false);
1799 };
1800
1801 inline ostream& operator<<(ostream& out, const PrimaryLogPG::RepGather& repop)
1802 {
1803 out << "repgather(" << &repop
1804 << " " << repop.v
1805 << " rep_tid=" << repop.rep_tid
1806 << " committed?=" << repop.all_committed
1807 << " applied?=" << repop.all_applied
1808 << " r=" << repop.r
1809 << ")";
1810 return out;
1811 }
1812
1813 inline ostream& operator<<(ostream& out,
1814 const PrimaryLogPG::ProxyWriteOpRef& pwop)
1815 {
1816 out << "proxywrite(" << &pwop
1817 << " " << pwop->user_version
1818 << " pwop_tid=" << pwop->objecter_tid;
1819 if (pwop->ctx->op)
1820 out << " op=" << *(pwop->ctx->op->get_req());
1821 out << ")";
1822 return out;
1823 }
1824
1825 void intrusive_ptr_add_ref(PrimaryLogPG::RepGather *repop);
1826 void intrusive_ptr_release(PrimaryLogPG::RepGather *repop);
1827
1828
1829 #endif