]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/PrimaryLogPG.h
update sources to v12.2.3
[ceph.git] / ceph / src / osd / PrimaryLogPG.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 /*
3 * Ceph - scalable distributed file system
4 *
5 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
6 * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
7 *
8 * Author: Loic Dachary <loic@dachary.org>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifndef CEPH_REPLICATEDPG_H
18 #define CEPH_REPLICATEDPG_H
19
20 #include <boost/tuple/tuple.hpp>
21 #include "include/assert.h"
22 #include "PG.h"
23 #include "Watch.h"
24 #include "TierAgentState.h"
25 #include "messages/MOSDOpReply.h"
26 #include "common/Checksummer.h"
27 #include "common/sharedptr_registry.hpp"
28 #include "ReplicatedBackend.h"
29 #include "PGTransaction.h"
30
31 class CopyFromCallback;
32 class PromoteCallback;
33
34 class PrimaryLogPG;
35 class PGLSFilter;
36 class HitSet;
37 struct TierAgentState;
38 class MOSDOp;
39 class MOSDOpReply;
40 class OSDService;
41
42 void intrusive_ptr_add_ref(PrimaryLogPG *pg);
43 void intrusive_ptr_release(PrimaryLogPG *pg);
44 uint64_t get_with_id(PrimaryLogPG *pg);
45 void put_with_id(PrimaryLogPG *pg, uint64_t id);
46
47 #ifdef PG_DEBUG_REFS
48 typedef TrackedIntPtr<PrimaryLogPG> PrimaryLogPGRef;
49 #else
50 typedef boost::intrusive_ptr<PrimaryLogPG> PrimaryLogPGRef;
51 #endif
52
53 struct inconsistent_snapset_wrapper;
54
55 class PrimaryLogPG : public PG, public PGBackend::Listener {
56 friend class OSD;
57 friend class Watch;
58
59 public:
60 MEMPOOL_CLASS_HELPERS();
61
62 /*
63 * state associated with a copy operation
64 */
65 struct OpContext;
66 class CopyCallback;
67
68 /**
69 * CopyResults stores the object metadata of interest to a copy initiator.
70 */
71 struct CopyResults {
72 ceph::real_time mtime; ///< the copy source's mtime
73 uint64_t object_size; ///< the copied object's size
74 bool started_temp_obj; ///< true if the callback needs to delete temp object
75 hobject_t temp_oid; ///< temp object (if any)
76
77 /**
78 * Function to fill in transaction; if non-empty the callback
79 * must execute it before any other accesses to the object
80 * (in order to complete the copy).
81 */
82 std::function<void(PGTransaction *)> fill_in_final_tx;
83
84 version_t user_version; ///< The copy source's user version
85 bool should_requeue; ///< op should be requeued on cancel
86 vector<snapid_t> snaps; ///< src's snaps (if clone)
87 snapid_t snap_seq; ///< src's snap_seq (if head)
88 librados::snap_set_t snapset; ///< src snapset (if head)
89 bool mirror_snapset;
90 bool has_omap;
91 uint32_t flags; // object_copy_data_t::FLAG_*
92 uint32_t source_data_digest, source_omap_digest;
93 uint32_t data_digest, omap_digest;
94 mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > reqids; // [(reqid, user_version)]
95 map<string, bufferlist> attrs; // xattrs
96 uint64_t truncate_seq;
97 uint64_t truncate_size;
98 bool is_data_digest() {
99 return flags & object_copy_data_t::FLAG_DATA_DIGEST;
100 }
101 bool is_omap_digest() {
102 return flags & object_copy_data_t::FLAG_OMAP_DIGEST;
103 }
104 CopyResults()
105 : object_size(0), started_temp_obj(false),
106 user_version(0),
107 should_requeue(false), mirror_snapset(false),
108 has_omap(false),
109 flags(0),
110 source_data_digest(-1), source_omap_digest(-1),
111 data_digest(-1), omap_digest(-1),
112 truncate_seq(0), truncate_size(0)
113 {}
114 };
115
116 struct CopyOp {
117 CopyCallback *cb;
118 ObjectContextRef obc;
119 hobject_t src;
120 object_locator_t oloc;
121 unsigned flags;
122 bool mirror_snapset;
123
124 CopyResults results;
125
126 ceph_tid_t objecter_tid;
127 ceph_tid_t objecter_tid2;
128
129 object_copy_cursor_t cursor;
130 map<string,bufferlist> attrs;
131 bufferlist data;
132 bufferlist omap_header;
133 bufferlist omap_data;
134 int rval;
135
136 object_copy_cursor_t temp_cursor;
137
138 /*
139 * For CopyOp the process is:
140 * step1: read the data(attr/omap/data) from the source object
141 * step2: handle those data(w/ those data create a new object)
142 * src_obj_fadvise_flags used in step1;
143 * dest_obj_fadvise_flags used in step2
144 */
145 unsigned src_obj_fadvise_flags;
146 unsigned dest_obj_fadvise_flags;
147
148 CopyOp(CopyCallback *cb_, ObjectContextRef _obc, hobject_t s,
149 object_locator_t l,
150 version_t v,
151 unsigned f,
152 bool ms,
153 unsigned src_obj_fadvise_flags,
154 unsigned dest_obj_fadvise_flags)
155 : cb(cb_), obc(_obc), src(s), oloc(l), flags(f),
156 mirror_snapset(ms),
157 objecter_tid(0),
158 objecter_tid2(0),
159 rval(-1),
160 src_obj_fadvise_flags(src_obj_fadvise_flags),
161 dest_obj_fadvise_flags(dest_obj_fadvise_flags)
162 {
163 results.user_version = v;
164 results.mirror_snapset = mirror_snapset;
165 }
166 };
167 typedef ceph::shared_ptr<CopyOp> CopyOpRef;
168
169 /**
170 * The CopyCallback class defines an interface for completions to the
171 * copy_start code. Users of the copy infrastructure must implement
172 * one and give an instance of the class to start_copy.
173 *
174 * The implementer is responsible for making sure that the CopyCallback
175 * can associate itself with the correct copy operation.
176 */
177 typedef boost::tuple<int, CopyResults*> CopyCallbackResults;
178
179 friend class CopyFromCallback;
180 friend class CopyFromFinisher;
181 friend class PromoteCallback;
182
183 struct ProxyReadOp {
184 OpRequestRef op;
185 hobject_t soid;
186 ceph_tid_t objecter_tid;
187 vector<OSDOp> &ops;
188 version_t user_version;
189 int data_offset;
190 bool canceled; ///< true if canceled
191
192 ProxyReadOp(OpRequestRef _op, hobject_t oid, vector<OSDOp>& _ops)
193 : op(_op), soid(oid),
194 objecter_tid(0), ops(_ops),
195 user_version(0), data_offset(0),
196 canceled(false) { }
197 };
198 typedef ceph::shared_ptr<ProxyReadOp> ProxyReadOpRef;
199
200 struct ProxyWriteOp {
201 OpContext *ctx;
202 OpRequestRef op;
203 hobject_t soid;
204 ceph_tid_t objecter_tid;
205 vector<OSDOp> &ops;
206 version_t user_version;
207 bool sent_reply;
208 utime_t mtime;
209 bool canceled;
210 osd_reqid_t reqid;
211
212 ProxyWriteOp(OpRequestRef _op, hobject_t oid, vector<OSDOp>& _ops, osd_reqid_t _reqid)
213 : ctx(NULL), op(_op), soid(oid),
214 objecter_tid(0), ops(_ops),
215 user_version(0), sent_reply(false),
216 canceled(false),
217 reqid(_reqid) { }
218 };
219 typedef ceph::shared_ptr<ProxyWriteOp> ProxyWriteOpRef;
220
221 struct FlushOp {
222 ObjectContextRef obc; ///< obc we are flushing
223 OpRequestRef op; ///< initiating op
224 list<OpRequestRef> dup_ops; ///< bandwagon jumpers
225 version_t flushed_version; ///< user version we are flushing
226 ceph_tid_t objecter_tid; ///< copy-from request tid
227 int rval; ///< copy-from result
228 bool blocking; ///< whether we are blocking updates
229 bool removal; ///< we are removing the backend object
230 boost::optional<std::function<void()>> on_flush; ///< callback, may be null
231
232 FlushOp()
233 : flushed_version(0), objecter_tid(0), rval(0),
234 blocking(false), removal(false) {}
235 ~FlushOp() { assert(!on_flush); }
236 };
237 typedef ceph::shared_ptr<FlushOp> FlushOpRef;
238
239 boost::scoped_ptr<PGBackend> pgbackend;
240 PGBackend *get_pgbackend() override {
241 return pgbackend.get();
242 }
243
244 /// Listener methods
245 DoutPrefixProvider *get_dpp() override {
246 return this;
247 }
248
249 void on_local_recover(
250 const hobject_t &oid,
251 const ObjectRecoveryInfo &recovery_info,
252 ObjectContextRef obc,
253 bool is_delete,
254 ObjectStore::Transaction *t
255 ) override;
256 void on_peer_recover(
257 pg_shard_t peer,
258 const hobject_t &oid,
259 const ObjectRecoveryInfo &recovery_info
260 ) override;
261 void begin_peer_recover(
262 pg_shard_t peer,
263 const hobject_t oid) override;
264 void on_global_recover(
265 const hobject_t &oid,
266 const object_stat_sum_t &stat_diff,
267 bool is_delete) override;
268 void failed_push(const list<pg_shard_t> &from, const hobject_t &soid) override;
269 void primary_failed(const hobject_t &soid) override;
270 bool primary_error(const hobject_t& soid, eversion_t v) override;
271 void cancel_pull(const hobject_t &soid) override;
272 void apply_stats(
273 const hobject_t &soid,
274 const object_stat_sum_t &delta_stats) override;
275 void on_primary_error(const hobject_t &oid, eversion_t v) override;
276 void backfill_add_missing(const hobject_t &oid, eversion_t v) override;
277 void remove_missing_object(const hobject_t &oid,
278 eversion_t v,
279 Context *on_complete) override;
280
281 template<class T> class BlessedGenContext;
282 class BlessedContext;
283 Context *bless_context(Context *c) override;
284
285 GenContext<ThreadPool::TPHandle&> *bless_gencontext(
286 GenContext<ThreadPool::TPHandle&> *c) override;
287
288 void send_message(int to_osd, Message *m) override {
289 osd->send_message_osd_cluster(to_osd, m, get_osdmap()->get_epoch());
290 }
291 void queue_transaction(ObjectStore::Transaction&& t,
292 OpRequestRef op) override {
293 osd->store->queue_transaction(osr.get(), std::move(t), 0, 0, 0, op);
294 }
295 void queue_transactions(vector<ObjectStore::Transaction>& tls,
296 OpRequestRef op) override {
297 osd->store->queue_transactions(osr.get(), tls, 0, 0, 0, op, NULL);
298 }
299 epoch_t get_epoch() const override {
300 return get_osdmap()->get_epoch();
301 }
302 epoch_t get_interval_start_epoch() const override {
303 return info.history.same_interval_since;
304 }
305 epoch_t get_last_peering_reset_epoch() const override {
306 return get_last_peering_reset();
307 }
308 const set<pg_shard_t> &get_actingbackfill_shards() const override {
309 return actingbackfill;
310 }
311 const set<pg_shard_t> &get_acting_shards() const override {
312 return actingset;
313 }
314 const set<pg_shard_t> &get_backfill_shards() const override {
315 return backfill_targets;
316 }
317
318 std::string gen_dbg_prefix() const override { return gen_prefix(); }
319
320 const map<hobject_t, set<pg_shard_t>>
321 &get_missing_loc_shards() const override {
322 return missing_loc.get_missing_locs();
323 }
324 const map<pg_shard_t, pg_missing_t> &get_shard_missing() const override {
325 return peer_missing;
326 }
327 using PGBackend::Listener::get_shard_missing;
328 const map<pg_shard_t, pg_info_t> &get_shard_info() const override {
329 return peer_info;
330 }
331 using PGBackend::Listener::get_shard_info;
332 const pg_missing_tracker_t &get_local_missing() const override {
333 return pg_log.get_missing();
334 }
335 const PGLog &get_log() const override {
336 return pg_log;
337 }
338 bool pgb_is_primary() const override {
339 return is_primary();
340 }
341 OSDMapRef pgb_get_osdmap() const override {
342 return get_osdmap();
343 }
344 const pg_info_t &get_info() const override {
345 return info;
346 }
347 const pg_pool_t &get_pool() const override {
348 return pool.info;
349 }
350
351 ObjectContextRef get_obc(
352 const hobject_t &hoid,
353 const map<string, bufferlist> &attrs) override {
354 return get_object_context(hoid, true, &attrs);
355 }
356
357 bool try_lock_for_read(
358 const hobject_t &hoid,
359 ObcLockManager &manager) override {
360 if (is_missing_object(hoid))
361 return false;
362 auto obc = get_object_context(hoid, false, nullptr);
363 if (!obc)
364 return false;
365 return manager.try_get_read_lock(hoid, obc);
366 }
367
368 void release_locks(ObcLockManager &manager) override {
369 release_object_locks(manager);
370 }
371
372 void pgb_set_object_snap_mapping(
373 const hobject_t &soid,
374 const set<snapid_t> &snaps,
375 ObjectStore::Transaction *t) override {
376 return update_object_snap_mapping(t, soid, snaps);
377 }
378 void pgb_clear_object_snap_mapping(
379 const hobject_t &soid,
380 ObjectStore::Transaction *t) override {
381 return clear_object_snap_mapping(t, soid);
382 }
383
384 void log_operation(
385 const vector<pg_log_entry_t> &logv,
386 const boost::optional<pg_hit_set_history_t> &hset_history,
387 const eversion_t &trim_to,
388 const eversion_t &roll_forward_to,
389 bool transaction_applied,
390 ObjectStore::Transaction &t) override {
391 if (hset_history) {
392 info.hit_set = *hset_history;
393 }
394 append_log(logv, trim_to, roll_forward_to, t, transaction_applied);
395 }
396
397 struct C_OSD_OnApplied;
398 void op_applied(
399 const eversion_t &applied_version) override;
400
401 bool should_send_op(
402 pg_shard_t peer,
403 const hobject_t &hoid) override {
404 if (peer == get_primary())
405 return true;
406 assert(peer_info.count(peer));
407 bool should_send =
408 hoid.pool != (int64_t)info.pgid.pool() ||
409 hoid <= last_backfill_started ||
410 hoid <= peer_info[peer].last_backfill;
411 if (!should_send)
412 assert(is_backfill_targets(peer));
413 return should_send;
414 }
415
416 void update_peer_last_complete_ondisk(
417 pg_shard_t fromosd,
418 eversion_t lcod) override {
419 peer_last_complete_ondisk[fromosd] = lcod;
420 }
421
422 void update_last_complete_ondisk(
423 eversion_t lcod) override {
424 last_complete_ondisk = lcod;
425 }
426
427 void update_stats(
428 const pg_stat_t &stat) override {
429 info.stats = stat;
430 }
431
432 void schedule_recovery_work(
433 GenContext<ThreadPool::TPHandle&> *c) override;
434
435 pg_shard_t whoami_shard() const override {
436 return pg_whoami;
437 }
438 spg_t primary_spg_t() const override {
439 return spg_t(info.pgid.pgid, primary.shard);
440 }
441 pg_shard_t primary_shard() const override {
442 return primary;
443 }
444 uint64_t min_peer_features() const override {
445 return get_min_peer_features();
446 }
447
448 void send_message_osd_cluster(
449 int peer, Message *m, epoch_t from_epoch) override;
450 void send_message_osd_cluster(
451 Message *m, Connection *con) override;
452 void send_message_osd_cluster(
453 Message *m, const ConnectionRef& con) override;
454 ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch) override;
455 entity_name_t get_cluster_msgr_name() override {
456 return osd->get_cluster_msgr_name();
457 }
458
459 PerfCounters *get_logger() override;
460
461 ceph_tid_t get_tid() override { return osd->get_tid(); }
462
463 LogClientTemp clog_error() override { return osd->clog->error(); }
464 LogClientTemp clog_warn() override { return osd->clog->warn(); }
465
466 struct watch_disconnect_t {
467 uint64_t cookie;
468 entity_name_t name;
469 bool send_disconnect;
470 watch_disconnect_t(uint64_t c, entity_name_t n, bool sd)
471 : cookie(c), name(n), send_disconnect(sd) {}
472 };
473 void complete_disconnect_watches(
474 ObjectContextRef obc,
475 const list<watch_disconnect_t> &to_disconnect);
476
477 struct OpFinisher {
478 virtual ~OpFinisher() {
479 }
480
481 virtual int execute() = 0;
482 };
483
484 /*
485 * Capture all object state associated with an in-progress read or write.
486 */
487 struct OpContext {
488 OpRequestRef op;
489 osd_reqid_t reqid;
490 vector<OSDOp> *ops;
491
492 const ObjectState *obs; // Old objectstate
493 const SnapSet *snapset; // Old snapset
494
495 ObjectState new_obs; // resulting ObjectState
496 SnapSet new_snapset; // resulting SnapSet (in case of a write)
497 //pg_stat_t new_stats; // resulting Stats
498 object_stat_sum_t delta_stats;
499
500 bool modify; // (force) modification (even if op_t is empty)
501 bool user_modify; // user-visible modification
502 bool undirty; // user explicitly un-dirtying this object
503 bool cache_evict; ///< true if this is a cache eviction
504 bool ignore_cache; ///< true if IGNORE_CACHE flag is set
505 bool ignore_log_op_stats; // don't log op stats
506 bool update_log_only; ///< this is a write that returned an error - just record in pg log for dup detection
507
508 // side effects
509 list<pair<watch_info_t,bool> > watch_connects; ///< new watch + will_ping flag
510 list<watch_disconnect_t> watch_disconnects; ///< old watch + send_discon
511 list<notify_info_t> notifies;
512 struct NotifyAck {
513 boost::optional<uint64_t> watch_cookie;
514 uint64_t notify_id;
515 bufferlist reply_bl;
516 explicit NotifyAck(uint64_t notify_id) : notify_id(notify_id) {}
517 NotifyAck(uint64_t notify_id, uint64_t cookie, bufferlist& rbl)
518 : watch_cookie(cookie), notify_id(notify_id) {
519 reply_bl.claim(rbl);
520 }
521 };
522 list<NotifyAck> notify_acks;
523
524 uint64_t bytes_written, bytes_read;
525
526 utime_t mtime;
527 SnapContext snapc; // writer snap context
528 eversion_t at_version; // pg's current version pointer
529 version_t user_at_version; // pg's current user version pointer
530
531 /// index of the current subop - only valid inside of do_osd_ops()
532 int current_osd_subop_num;
533 /// total number of subops processed in this context for cls_cxx_subop_version()
534 int processed_subop_count = 0;
535
536 PGTransactionUPtr op_t;
537 vector<pg_log_entry_t> log;
538 boost::optional<pg_hit_set_history_t> updated_hset_history;
539
540 interval_set<uint64_t> modified_ranges;
541 ObjectContextRef obc;
542 ObjectContextRef clone_obc; // if we created a clone
543 ObjectContextRef snapset_obc; // if we created/deleted a snapdir
544
545 // FIXME: we may want to kill this msgr hint off at some point!
546 boost::optional<int> data_off = boost::none;
547
548 MOSDOpReply *reply;
549
550 PrimaryLogPG *pg;
551
552 int num_read; ///< count read ops
553 int num_write; ///< count update ops
554
555 mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > extra_reqids;
556
557 hobject_t new_temp_oid, discard_temp_oid; ///< temp objects we should start/stop tracking
558
559 list<std::function<void()>> on_applied;
560 list<std::function<void()>> on_committed;
561 list<std::function<void()>> on_finish;
562 list<std::function<void()>> on_success;
563 template <typename F>
564 void register_on_finish(F &&f) {
565 on_finish.emplace_back(std::forward<F>(f));
566 }
567 template <typename F>
568 void register_on_success(F &&f) {
569 on_success.emplace_back(std::forward<F>(f));
570 }
571 template <typename F>
572 void register_on_applied(F &&f) {
573 on_applied.emplace_back(std::forward<F>(f));
574 }
575 template <typename F>
576 void register_on_commit(F &&f) {
577 on_committed.emplace_back(std::forward<F>(f));
578 }
579
580 bool sent_reply;
581
582 // pending async reads <off, len, op_flags> -> <outbl, outr>
583 list<pair<boost::tuple<uint64_t, uint64_t, unsigned>,
584 pair<bufferlist*, Context*> > > pending_async_reads;
585 int inflightreads;
586 friend struct OnReadComplete;
587 void start_async_reads(PrimaryLogPG *pg);
588 void finish_read(PrimaryLogPG *pg);
589 bool async_reads_complete() {
590 return inflightreads == 0;
591 }
592
593 ObjectContext::RWState::State lock_type;
594 ObcLockManager lock_manager;
595
596 std::map<int, std::unique_ptr<OpFinisher>> op_finishers;
597
598 OpContext(const OpContext& other);
599 const OpContext& operator=(const OpContext& other);
600
601 OpContext(OpRequestRef _op, osd_reqid_t _reqid, vector<OSDOp>* _ops,
602 ObjectContextRef& obc,
603 PrimaryLogPG *_pg) :
604 op(_op), reqid(_reqid), ops(_ops),
605 obs(&obc->obs),
606 snapset(0),
607 new_obs(obs->oi, obs->exists),
608 modify(false), user_modify(false), undirty(false), cache_evict(false),
609 ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
610 bytes_written(0), bytes_read(0), user_at_version(0),
611 current_osd_subop_num(0),
612 obc(obc),
613 reply(NULL), pg(_pg),
614 num_read(0),
615 num_write(0),
616 sent_reply(false),
617 inflightreads(0),
618 lock_type(ObjectContext::RWState::RWNONE) {
619 if (obc->ssc) {
620 new_snapset = obc->ssc->snapset;
621 snapset = &obc->ssc->snapset;
622 }
623 }
624 OpContext(OpRequestRef _op, osd_reqid_t _reqid,
625 vector<OSDOp>* _ops, PrimaryLogPG *_pg) :
626 op(_op), reqid(_reqid), ops(_ops), obs(NULL), snapset(0),
627 modify(false), user_modify(false), undirty(false), cache_evict(false),
628 ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
629 bytes_written(0), bytes_read(0), user_at_version(0),
630 current_osd_subop_num(0),
631 reply(NULL), pg(_pg),
632 num_read(0),
633 num_write(0),
634 inflightreads(0),
635 lock_type(ObjectContext::RWState::RWNONE) {}
636 void reset_obs(ObjectContextRef obc) {
637 new_obs = ObjectState(obc->obs.oi, obc->obs.exists);
638 if (obc->ssc) {
639 new_snapset = obc->ssc->snapset;
640 snapset = &obc->ssc->snapset;
641 }
642 }
643 ~OpContext() {
644 assert(!op_t);
645 if (reply)
646 reply->put();
647 for (list<pair<boost::tuple<uint64_t, uint64_t, unsigned>,
648 pair<bufferlist*, Context*> > >::iterator i =
649 pending_async_reads.begin();
650 i != pending_async_reads.end();
651 pending_async_reads.erase(i++)) {
652 delete i->second.second;
653 }
654 }
655 uint64_t get_features() {
656 if (op && op->get_req()) {
657 return op->get_req()->get_connection()->get_features();
658 }
659 return -1ull;
660 }
661 };
662 using OpContextUPtr = std::unique_ptr<OpContext>;
663 friend struct OpContext;
664
665 /*
666 * State on the PG primary associated with the replicated mutation
667 */
668 class RepGather {
669 public:
670 hobject_t hoid;
671 OpRequestRef op;
672 xlist<RepGather*>::item queue_item;
673 int nref;
674
675 eversion_t v;
676 int r = 0;
677
678 ceph_tid_t rep_tid;
679
680 bool rep_aborted, rep_done;
681
682 bool all_applied;
683 bool all_committed;
684 const bool applies_with_commit;
685
686 utime_t start;
687
688 eversion_t pg_local_last_complete;
689
690 ObcLockManager lock_manager;
691
692 list<std::function<void()>> on_applied;
693 list<std::function<void()>> on_committed;
694 list<std::function<void()>> on_success;
695 list<std::function<void()>> on_finish;
696
697 RepGather(
698 OpContext *c, ceph_tid_t rt,
699 eversion_t lc,
700 bool applies_with_commit) :
701 hoid(c->obc->obs.oi.soid),
702 op(c->op),
703 queue_item(this),
704 nref(1),
705 rep_tid(rt),
706 rep_aborted(false), rep_done(false),
707 all_applied(false), all_committed(false),
708 applies_with_commit(applies_with_commit),
709 pg_local_last_complete(lc),
710 lock_manager(std::move(c->lock_manager)),
711 on_applied(std::move(c->on_applied)),
712 on_committed(std::move(c->on_committed)),
713 on_success(std::move(c->on_success)),
714 on_finish(std::move(c->on_finish)) {}
715
716 RepGather(
717 ObcLockManager &&manager,
718 OpRequestRef &&o,
719 boost::optional<std::function<void(void)> > &&on_complete,
720 ceph_tid_t rt,
721 eversion_t lc,
722 bool applies_with_commit,
723 int r) :
724 op(o),
725 queue_item(this),
726 nref(1),
727 r(r),
728 rep_tid(rt),
729 rep_aborted(false), rep_done(false),
730 all_applied(false), all_committed(false),
731 applies_with_commit(applies_with_commit),
732 pg_local_last_complete(lc),
733 lock_manager(std::move(manager)) {
734 if (on_complete) {
735 on_success.push_back(std::move(*on_complete));
736 }
737 }
738
739 RepGather *get() {
740 nref++;
741 return this;
742 }
743 void put() {
744 assert(nref > 0);
745 if (--nref == 0) {
746 assert(on_applied.empty());
747 delete this;
748 //generic_dout(0) << "deleting " << this << dendl;
749 }
750 }
751 };
752
753
754 protected:
755
756 /**
757 * Grabs locks for OpContext, should be cleaned up in close_op_ctx
758 *
759 * @param ctx [in,out] ctx to get locks for
760 * @return true on success, false if we are queued
761 */
762 bool get_rw_locks(bool write_ordered, OpContext *ctx) {
763 /* If snapset_obc, !obc->obs->exists and we will always take the
764 * snapdir lock *before* the head lock. Since all callers will do
765 * this (read or write) if we get the first we will be guaranteed
766 * to get the second.
767 */
768 if (write_ordered && ctx->op->may_read()) {
769 ctx->lock_type = ObjectContext::RWState::RWEXCL;
770 } else if (write_ordered) {
771 ctx->lock_type = ObjectContext::RWState::RWWRITE;
772 } else {
773 assert(ctx->op->may_read());
774 ctx->lock_type = ObjectContext::RWState::RWREAD;
775 }
776
777 if (ctx->snapset_obc) {
778 assert(!ctx->obc->obs.exists);
779 if (!ctx->lock_manager.get_lock_type(
780 ctx->lock_type,
781 ctx->snapset_obc->obs.oi.soid,
782 ctx->snapset_obc,
783 ctx->op)) {
784 ctx->lock_type = ObjectContext::RWState::RWNONE;
785 return false;
786 }
787 }
788 if (ctx->lock_manager.get_lock_type(
789 ctx->lock_type,
790 ctx->obc->obs.oi.soid,
791 ctx->obc,
792 ctx->op)) {
793 return true;
794 } else {
795 assert(!ctx->snapset_obc);
796 ctx->lock_type = ObjectContext::RWState::RWNONE;
797 return false;
798 }
799 }
800
801 /**
802 * Cleans up OpContext
803 *
804 * @param ctx [in] ctx to clean up
805 */
806 void close_op_ctx(OpContext *ctx);
807
808 /**
809 * Releases locks
810 *
811 * @param manager [in] manager with locks to release
812 */
813 void release_object_locks(
814 ObcLockManager &lock_manager) {
815 list<pair<hobject_t, list<OpRequestRef> > > to_req;
816 bool requeue_recovery = false;
817 bool requeue_snaptrim = false;
818 lock_manager.put_locks(
819 &to_req,
820 &requeue_recovery,
821 &requeue_snaptrim);
822 if (requeue_recovery)
823 queue_recovery();
824 if (requeue_snaptrim)
825 snap_trimmer_machine.process_event(TrimWriteUnblocked());
826
827 if (!to_req.empty()) {
828 // requeue at front of scrub blocking queue if we are blocked by scrub
829 for (auto &&p: to_req) {
830 if (scrubber.write_blocked_by_scrub(p.first.get_head())) {
831 waiting_for_scrub.splice(
832 waiting_for_scrub.begin(),
833 p.second,
834 p.second.begin(),
835 p.second.end());
836 } else {
837 requeue_ops(p.second);
838 }
839 }
840 }
841 }
842
843 // replica ops
844 // [primary|tail]
845 xlist<RepGather*> repop_queue;
846
847 friend class C_OSD_RepopApplied;
848 friend class C_OSD_RepopCommit;
849 void repop_all_applied(RepGather *repop);
850 void repop_all_committed(RepGather *repop);
851 void eval_repop(RepGather*);
852 void issue_repop(RepGather *repop, OpContext *ctx);
853 RepGather *new_repop(
854 OpContext *ctx,
855 ObjectContextRef obc,
856 ceph_tid_t rep_tid);
857 boost::intrusive_ptr<RepGather> new_repop(
858 eversion_t version,
859 int r,
860 ObcLockManager &&manager,
861 OpRequestRef &&op,
862 boost::optional<std::function<void(void)> > &&on_complete);
863 void remove_repop(RepGather *repop);
864
865 OpContextUPtr simple_opc_create(ObjectContextRef obc);
866 void simple_opc_submit(OpContextUPtr ctx);
867
868 /**
869 * Merge entries atomically into all actingbackfill osds
870 * adjusting missing and recovery state as necessary.
871 *
872 * Also used to store error log entries for dup detection.
873 */
874 void submit_log_entries(
875 const mempool::osd_pglog::list<pg_log_entry_t> &entries,
876 ObcLockManager &&manager,
877 boost::optional<std::function<void(void)> > &&on_complete,
878 OpRequestRef op = OpRequestRef(),
879 int r = 0);
880 struct LogUpdateCtx {
881 boost::intrusive_ptr<RepGather> repop;
882 set<pg_shard_t> waiting_on;
883 };
884 void cancel_log_updates();
885 map<ceph_tid_t, LogUpdateCtx> log_entry_update_waiting_on;
886
887
888 // hot/cold tracking
889 HitSetRef hit_set; ///< currently accumulating HitSet
890 utime_t hit_set_start_stamp; ///< time the current HitSet started recording
891
892
893 void hit_set_clear(); ///< discard any HitSet state
894 void hit_set_setup(); ///< initialize HitSet state
895 void hit_set_create(); ///< create a new HitSet
896 void hit_set_persist(); ///< persist hit info
897 bool hit_set_apply_log(); ///< apply log entries to update in-memory HitSet
898 void hit_set_trim(OpContextUPtr &ctx, unsigned max); ///< discard old HitSets
899 void hit_set_in_memory_trim(uint32_t max_in_memory); ///< discard old in memory HitSets
900 void hit_set_remove_all();
901
902 hobject_t get_hit_set_current_object(utime_t stamp);
903 hobject_t get_hit_set_archive_object(utime_t start,
904 utime_t end,
905 bool using_gmt);
906
907 // agent
908 boost::scoped_ptr<TierAgentState> agent_state;
909
910 void agent_setup(); ///< initialize agent state
911 bool agent_work(int max) override ///< entry point to do some agent work
912 {
913 return agent_work(max, max);
914 }
915 bool agent_work(int max, int agent_flush_quota) override;
916 bool agent_maybe_flush(ObjectContextRef& obc); ///< maybe flush
917 bool agent_maybe_evict(ObjectContextRef& obc, bool after_flush); ///< maybe evict
918
919 void agent_load_hit_sets(); ///< load HitSets, if needed
920
921 /// estimate object atime and temperature
922 ///
923 /// @param oid [in] object name
924 /// @param temperature [out] relative temperature (# consider both access time and frequency)
925 void agent_estimate_temp(const hobject_t& oid, int *temperature);
926
927 /// stop the agent
928 void agent_stop() override;
929 void agent_delay() override;
930
931 /// clear agent state
932 void agent_clear() override;
933
934 /// choose (new) agent mode(s), returns true if op is requeued
935 bool agent_choose_mode(bool restart = false, OpRequestRef op = OpRequestRef());
936 void agent_choose_mode_restart() override;
937
938 /// true if we can send an ondisk/commit for v
939 bool already_complete(eversion_t v);
940 /// true if we can send an ack for v
941 bool already_ack(eversion_t v);
942
943 // projected object info
944 SharedLRU<hobject_t, ObjectContext> object_contexts;
945 // map from oid.snapdir() to SnapSetContext *
946 map<hobject_t, SnapSetContext*> snapset_contexts;
947 Mutex snapset_contexts_lock;
948
949 // debug order that client ops are applied
950 map<hobject_t, map<client_t, ceph_tid_t>> debug_op_order;
951
952 void populate_obc_watchers(ObjectContextRef obc);
953 void check_blacklisted_obc_watchers(ObjectContextRef obc);
954 void check_blacklisted_watchers() override;
955 void get_watchers(list<obj_watch_item_t> &pg_watchers) override;
956 void get_obc_watchers(ObjectContextRef obc, list<obj_watch_item_t> &pg_watchers);
957 public:
958 void handle_watch_timeout(WatchRef watch);
959 protected:
960
961 ObjectContextRef create_object_context(const object_info_t& oi, SnapSetContext *ssc);
962 ObjectContextRef get_object_context(
963 const hobject_t& soid,
964 bool can_create,
965 const map<string, bufferlist> *attrs = 0
966 );
967
968 void context_registry_on_change();
969 void object_context_destructor_callback(ObjectContext *obc);
970 class C_PG_ObjectContext;
971
972 int find_object_context(const hobject_t& oid,
973 ObjectContextRef *pobc,
974 bool can_create,
975 bool map_snapid_to_clone=false,
976 hobject_t *missing_oid=NULL);
977
978 void add_object_context_to_pg_stat(ObjectContextRef obc, pg_stat_t *stat);
979
980 void get_src_oloc(const object_t& oid, const object_locator_t& oloc, object_locator_t& src_oloc);
981
982 SnapSetContext *get_snapset_context(
983 const hobject_t& oid,
984 bool can_create,
985 const map<string, bufferlist> *attrs = 0,
986 bool oid_existed = true //indicate this oid whether exsited in backend
987 );
988 void register_snapset_context(SnapSetContext *ssc) {
989 Mutex::Locker l(snapset_contexts_lock);
990 _register_snapset_context(ssc);
991 }
992 void _register_snapset_context(SnapSetContext *ssc) {
993 assert(snapset_contexts_lock.is_locked());
994 if (!ssc->registered) {
995 assert(snapset_contexts.count(ssc->oid) == 0);
996 ssc->registered = true;
997 snapset_contexts[ssc->oid] = ssc;
998 }
999 }
1000 void put_snapset_context(SnapSetContext *ssc);
1001
1002 map<hobject_t, ObjectContextRef> recovering;
1003
1004 /*
1005 * Backfill
1006 *
1007 * peer_info[backfill_target].last_backfill == info.last_backfill on the peer.
1008 *
1009 * objects prior to peer_info[backfill_target].last_backfill
1010 * - are on the peer
1011 * - are included in the peer stats
1012 *
1013 * objects \in (last_backfill, last_backfill_started]
1014 * - are on the peer or are in backfills_in_flight
1015 * - are not included in pg stats (yet)
1016 * - have their stats in pending_backfill_updates on the primary
1017 */
1018 set<hobject_t> backfills_in_flight;
1019 map<hobject_t, pg_stat_t> pending_backfill_updates;
1020
1021 void dump_recovery_info(Formatter *f) const override {
1022 f->open_array_section("backfill_targets");
1023 for (set<pg_shard_t>::const_iterator p = backfill_targets.begin();
1024 p != backfill_targets.end(); ++p)
1025 f->dump_stream("replica") << *p;
1026 f->close_section();
1027 f->open_array_section("waiting_on_backfill");
1028 for (set<pg_shard_t>::const_iterator p = waiting_on_backfill.begin();
1029 p != waiting_on_backfill.end(); ++p)
1030 f->dump_stream("osd") << *p;
1031 f->close_section();
1032 f->dump_stream("last_backfill_started") << last_backfill_started;
1033 {
1034 f->open_object_section("backfill_info");
1035 backfill_info.dump(f);
1036 f->close_section();
1037 }
1038 {
1039 f->open_array_section("peer_backfill_info");
1040 for (map<pg_shard_t, BackfillInterval>::const_iterator pbi =
1041 peer_backfill_info.begin();
1042 pbi != peer_backfill_info.end(); ++pbi) {
1043 f->dump_stream("osd") << pbi->first;
1044 f->open_object_section("BackfillInterval");
1045 pbi->second.dump(f);
1046 f->close_section();
1047 }
1048 f->close_section();
1049 }
1050 {
1051 f->open_array_section("backfills_in_flight");
1052 for (set<hobject_t>::const_iterator i = backfills_in_flight.begin();
1053 i != backfills_in_flight.end();
1054 ++i) {
1055 f->dump_stream("object") << *i;
1056 }
1057 f->close_section();
1058 }
1059 {
1060 f->open_array_section("recovering");
1061 for (map<hobject_t, ObjectContextRef>::const_iterator i = recovering.begin();
1062 i != recovering.end();
1063 ++i) {
1064 f->dump_stream("object") << i->first;
1065 }
1066 f->close_section();
1067 }
1068 {
1069 f->open_object_section("pg_backend");
1070 pgbackend->dump_recovery_info(f);
1071 f->close_section();
1072 }
1073 }
1074
1075 /// last backfill operation started
1076 hobject_t last_backfill_started;
1077 bool new_backfill;
1078
1079 int prep_object_replica_pushes(const hobject_t& soid, eversion_t v,
1080 PGBackend::RecoveryHandle *h);
1081 int prep_object_replica_deletes(const hobject_t& soid, eversion_t v,
1082 PGBackend::RecoveryHandle *h);
1083
1084 void finish_degraded_object(const hobject_t& oid);
1085
1086 // Cancels/resets pulls from peer
1087 void check_recovery_sources(const OSDMapRef& map) override ;
1088
1089 int recover_missing(
1090 const hobject_t& oid,
1091 eversion_t v,
1092 int priority,
1093 PGBackend::RecoveryHandle *h);
1094
1095 // low level ops
1096
1097 void _make_clone(
1098 OpContext *ctx,
1099 PGTransaction* t,
1100 ObjectContextRef obc,
1101 const hobject_t& head, const hobject_t& coid,
1102 object_info_t *poi);
1103 void execute_ctx(OpContext *ctx);
1104 void finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc=true);
1105 void reply_ctx(OpContext *ctx, int err);
1106 void reply_ctx(OpContext *ctx, int err, eversion_t v, version_t uv);
1107 void make_writeable(OpContext *ctx);
1108 void log_op_stats(OpContext *ctx);
1109
1110 void write_update_size_and_usage(object_stat_sum_t& stats, object_info_t& oi,
1111 interval_set<uint64_t>& modified, uint64_t offset,
1112 uint64_t length, bool write_full=false);
1113 void add_interval_usage(interval_set<uint64_t>& s, object_stat_sum_t& st);
1114
1115
1116 enum class cache_result_t {
1117 NOOP,
1118 BLOCKED_FULL,
1119 BLOCKED_PROMOTE,
1120 HANDLED_PROXY,
1121 HANDLED_REDIRECT,
1122 REPLIED_WITH_EAGAIN,
1123 };
1124 cache_result_t maybe_handle_cache_detail(OpRequestRef op,
1125 bool write_ordered,
1126 ObjectContextRef obc, int r,
1127 hobject_t missing_oid,
1128 bool must_promote,
1129 bool in_hit_set,
1130 ObjectContextRef *promote_obc);
1131 cache_result_t maybe_handle_manifest_detail(OpRequestRef op,
1132 bool write_ordered,
1133 ObjectContextRef obc);
1134 bool maybe_handle_manifest(OpRequestRef op,
1135 bool write_ordered,
1136 ObjectContextRef obc) {
1137 return cache_result_t::NOOP != maybe_handle_manifest_detail(
1138 op,
1139 write_ordered,
1140 obc);
1141 }
1142
1143 /**
1144 * This helper function is called from do_op if the ObjectContext lookup fails.
1145 * @returns true if the caching code is handling the Op, false otherwise.
1146 */
1147 bool maybe_handle_cache(OpRequestRef op,
1148 bool write_ordered,
1149 ObjectContextRef obc, int r,
1150 const hobject_t& missing_oid,
1151 bool must_promote,
1152 bool in_hit_set = false) {
1153 return cache_result_t::NOOP != maybe_handle_cache_detail(
1154 op,
1155 write_ordered,
1156 obc,
1157 r,
1158 missing_oid,
1159 must_promote,
1160 in_hit_set,
1161 nullptr);
1162 }
1163
1164 /**
1165 * This helper function checks if a promotion is needed.
1166 */
1167 bool maybe_promote(ObjectContextRef obc,
1168 const hobject_t& missing_oid,
1169 const object_locator_t& oloc,
1170 bool in_hit_set,
1171 uint32_t recency,
1172 OpRequestRef promote_op,
1173 ObjectContextRef *promote_obc = nullptr);
1174 /**
1175 * This helper function tells the client to redirect their request elsewhere.
1176 */
1177 void do_cache_redirect(OpRequestRef op);
1178 /**
1179 * This function attempts to start a promote. Either it succeeds,
1180 * or places op on a wait list. If op is null, failure means that
1181 * this is a noop. If a future user wants to be able to distinguish
1182 * these cases, a return value should be added.
1183 */
1184 void promote_object(
1185 ObjectContextRef obc, ///< [optional] obc
1186 const hobject_t& missing_object, ///< oid (if !obc)
1187 const object_locator_t& oloc, ///< locator for obc|oid
1188 OpRequestRef op, ///< [optional] client op
1189 ObjectContextRef *promote_obc = nullptr ///< [optional] new obc for object
1190 );
1191
1192 int prepare_transaction(OpContext *ctx);
1193 list<pair<OpRequestRef, OpContext*> > in_progress_async_reads;
1194 void complete_read_ctx(int result, OpContext *ctx);
1195
1196 // pg on-disk content
1197 void check_local() override;
1198
1199 void _clear_recovery_state() override;
1200
1201 bool start_recovery_ops(
1202 uint64_t max,
1203 ThreadPool::TPHandle &handle, uint64_t *started) override;
1204
1205 uint64_t recover_primary(uint64_t max, ThreadPool::TPHandle &handle);
1206 uint64_t recover_replicas(uint64_t max, ThreadPool::TPHandle &handle);
1207 hobject_t earliest_peer_backfill() const;
1208 bool all_peer_done() const;
1209 /**
1210 * @param work_started will be set to true if recover_backfill got anywhere
1211 * @returns the number of operations started
1212 */
1213 uint64_t recover_backfill(uint64_t max, ThreadPool::TPHandle &handle,
1214 bool *work_started);
1215
1216 /**
1217 * scan a (hash) range of objects in the current pg
1218 *
1219 * @begin first item should be >= this value
1220 * @min return at least this many items, unless we are done
1221 * @max return no more than this many items
1222 * @bi [out] resulting map of objects to eversion_t's
1223 */
1224 void scan_range(
1225 int min, int max, BackfillInterval *bi,
1226 ThreadPool::TPHandle &handle
1227 );
1228
1229 /// Update a hash range to reflect changes since the last scan
1230 void update_range(
1231 BackfillInterval *bi, ///< [in,out] interval to update
1232 ThreadPool::TPHandle &handle ///< [in] tp handle
1233 );
1234
1235 int prep_backfill_object_push(
1236 hobject_t oid, eversion_t v, ObjectContextRef obc,
1237 vector<pg_shard_t> peers,
1238 PGBackend::RecoveryHandle *h);
1239 void send_remove_op(const hobject_t& oid, eversion_t v, pg_shard_t peer);
1240
1241
1242 class C_OSD_OndiskWriteUnlock;
1243 class C_OSD_AppliedRecoveredObject;
1244 class C_OSD_CommittedPushedObject;
1245 class C_OSD_AppliedRecoveredObjectReplica;
1246 void sub_op_remove(OpRequestRef op);
1247
1248 void _applied_recovered_object(ObjectContextRef obc);
1249 void _applied_recovered_object_replica();
1250 void _committed_pushed_object(epoch_t epoch, eversion_t lc);
1251 void recover_got(hobject_t oid, eversion_t v);
1252
1253 // -- copyfrom --
1254 map<hobject_t, CopyOpRef> copy_ops;
1255
1256 int do_copy_get(OpContext *ctx, bufferlist::iterator& bp, OSDOp& op,
1257 ObjectContextRef& obc);
1258 int finish_copy_get();
1259
1260 void fill_in_copy_get_noent(OpRequestRef& op, hobject_t oid,
1261 OSDOp& osd_op);
1262
1263 /**
1264 * To copy an object, call start_copy.
1265 *
1266 * @param cb: The CopyCallback to be activated when the copy is complete
1267 * @param obc: The ObjectContext we are copying into
1268 * @param src: The source object
1269 * @param oloc: the source object locator
1270 * @param version: the version of the source object to copy (0 for any)
1271 */
1272 void start_copy(CopyCallback *cb, ObjectContextRef obc, hobject_t src,
1273 object_locator_t oloc, version_t version, unsigned flags,
1274 bool mirror_snapset, unsigned src_obj_fadvise_flags,
1275 unsigned dest_obj_fadvise_flags);
1276 void process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r);
1277 void _write_copy_chunk(CopyOpRef cop, PGTransaction *t);
1278 uint64_t get_copy_chunk_size() const {
1279 uint64_t size = cct->_conf->osd_copyfrom_max_chunk;
1280 if (pool.info.requires_aligned_append()) {
1281 uint64_t alignment = pool.info.required_alignment();
1282 if (size % alignment) {
1283 size += alignment - (size % alignment);
1284 }
1285 }
1286 return size;
1287 }
1288 void _copy_some(ObjectContextRef obc, CopyOpRef cop);
1289 void finish_copyfrom(CopyFromCallback *cb);
1290 void finish_promote(int r, CopyResults *results, ObjectContextRef obc);
1291 void cancel_copy(CopyOpRef cop, bool requeue);
1292 void cancel_copy_ops(bool requeue);
1293
1294 friend struct C_Copyfrom;
1295
1296 // -- flush --
1297 map<hobject_t, FlushOpRef> flush_ops;
1298
1299 /// start_flush takes ownership of on_flush iff ret == -EINPROGRESS
1300 int start_flush(
1301 OpRequestRef op, ObjectContextRef obc,
1302 bool blocking, hobject_t *pmissing,
1303 boost::optional<std::function<void()>> &&on_flush);
1304 void finish_flush(hobject_t oid, ceph_tid_t tid, int r);
1305 int try_flush_mark_clean(FlushOpRef fop);
1306 void cancel_flush(FlushOpRef fop, bool requeue);
1307 void cancel_flush_ops(bool requeue);
1308
1309 /// @return false if clone is has been evicted
1310 bool is_present_clone(hobject_t coid);
1311
1312 friend struct C_Flush;
1313
1314 // -- scrub --
1315 bool _range_available_for_scrub(
1316 const hobject_t &begin, const hobject_t &end) override;
1317 void scrub_snapshot_metadata(
1318 ScrubMap &map,
1319 const std::map<hobject_t, pair<uint32_t, uint32_t>> &missing_digest) override;
1320 void _scrub_clear_state() override;
1321 void _scrub_finish() override;
1322 object_stat_collection_t scrub_cstat;
1323
1324 void _split_into(pg_t child_pgid, PG *child,
1325 unsigned split_bits) override;
1326 void apply_and_flush_repops(bool requeue);
1327
1328 void calc_trim_to() override;
1329 int do_xattr_cmp_u64(int op, __u64 v1, bufferlist& xattr);
1330 int do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr);
1331
1332 // -- checksum --
1333 int do_checksum(OpContext *ctx, OSDOp& osd_op, bufferlist::iterator *bl_it);
1334 int finish_checksum(OSDOp& osd_op, Checksummer::CSumType csum_type,
1335 bufferlist::iterator *init_value_bl_it,
1336 const bufferlist &read_bl);
1337
1338 friend class C_ChecksumRead;
1339
1340 int do_extent_cmp(OpContext *ctx, OSDOp& osd_op);
1341 int finish_extent_cmp(OSDOp& osd_op, const bufferlist &read_bl);
1342
1343 friend class C_ExtentCmpRead;
1344
1345 int do_read(OpContext *ctx, OSDOp& osd_op);
1346 int do_sparse_read(OpContext *ctx, OSDOp& osd_op);
1347 int do_writesame(OpContext *ctx, OSDOp& osd_op);
1348
1349 bool pgls_filter(PGLSFilter *filter, hobject_t& sobj, bufferlist& outdata);
1350 int get_pgls_filter(bufferlist::iterator& iter, PGLSFilter **pfilter);
1351
1352 map<hobject_t, list<OpRequestRef>> in_progress_proxy_ops;
1353 void kick_proxy_ops_blocked(hobject_t& soid);
1354 void cancel_proxy_ops(bool requeue);
1355
1356 // -- proxyread --
1357 map<ceph_tid_t, ProxyReadOpRef> proxyread_ops;
1358
1359 void do_proxy_read(OpRequestRef op, ObjectContextRef obc = NULL);
1360 void finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r);
1361 void cancel_proxy_read(ProxyReadOpRef prdop);
1362
1363 friend struct C_ProxyRead;
1364
1365 // -- proxywrite --
1366 map<ceph_tid_t, ProxyWriteOpRef> proxywrite_ops;
1367
1368 void do_proxy_write(OpRequestRef op, const hobject_t& missing_oid, ObjectContextRef obc = NULL);
1369 void finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r);
1370 void cancel_proxy_write(ProxyWriteOpRef pwop);
1371
1372 friend struct C_ProxyWrite_Commit;
1373
1374 public:
1375 PrimaryLogPG(OSDService *o, OSDMapRef curmap,
1376 const PGPool &_pool, spg_t p);
1377 ~PrimaryLogPG() override {}
1378
1379 int do_command(
1380 cmdmap_t cmdmap,
1381 ostream& ss,
1382 bufferlist& idata,
1383 bufferlist& odata,
1384 ConnectionRef conn,
1385 ceph_tid_t tid) override;
1386
1387 void do_request(
1388 OpRequestRef& op,
1389 ThreadPool::TPHandle &handle) override;
1390 void do_op(OpRequestRef& op) override;
1391 void record_write_error(OpRequestRef op, const hobject_t &soid,
1392 MOSDOpReply *orig_reply, int r);
1393 void do_pg_op(OpRequestRef op);
1394 void do_sub_op(OpRequestRef op) override;
1395 void do_sub_op_reply(OpRequestRef op) override;
1396 void do_scan(
1397 OpRequestRef op,
1398 ThreadPool::TPHandle &handle) override;
1399 void do_backfill(OpRequestRef op) override;
1400 void do_backfill_remove(OpRequestRef op);
1401
1402 void handle_backoff(OpRequestRef& op);
1403
1404 int trim_object(bool first, const hobject_t &coid, OpContextUPtr *ctxp);
1405 void snap_trimmer(epoch_t e) override;
1406 void kick_snap_trim() override;
1407 void snap_trimmer_scrub_complete() override;
1408 int do_osd_ops(OpContext *ctx, vector<OSDOp>& ops);
1409
1410 int _get_tmap(OpContext *ctx, bufferlist *header, bufferlist *vals);
1411 int do_tmap2omap(OpContext *ctx, unsigned flags);
1412 int do_tmapup(OpContext *ctx, bufferlist::iterator& bp, OSDOp& osd_op);
1413 int do_tmapup_slow(OpContext *ctx, bufferlist::iterator& bp, OSDOp& osd_op, bufferlist& bl);
1414
1415 void do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn);
1416 private:
1417 int do_scrub_ls(MOSDOp *op, OSDOp *osd_op);
1418 hobject_t earliest_backfill() const;
1419 bool check_src_targ(const hobject_t& soid, const hobject_t& toid) const;
1420
1421 uint64_t temp_seq; ///< last id for naming temp objects
1422 /// generate a new temp object name
1423 hobject_t generate_temp_object(const hobject_t& target);
1424 /// generate a new temp object name (for recovery)
1425 hobject_t get_temp_recovery_object(const hobject_t& target,
1426 eversion_t version) override;
1427 int get_recovery_op_priority() const {
1428 int pri = 0;
1429 pool.info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
1430 return pri > 0 ? pri : cct->_conf->osd_recovery_op_priority;
1431 }
1432 void log_missing(unsigned missing,
1433 const boost::optional<hobject_t> &head,
1434 LogChannelRef clog,
1435 const spg_t &pgid,
1436 const char *func,
1437 const char *mode,
1438 bool allow_incomplete_clones);
1439 unsigned process_clones_to(const boost::optional<hobject_t> &head,
1440 const boost::optional<SnapSet> &snapset,
1441 LogChannelRef clog,
1442 const spg_t &pgid,
1443 const char *mode,
1444 bool allow_incomplete_clones,
1445 boost::optional<snapid_t> target,
1446 vector<snapid_t>::reverse_iterator *curclone,
1447 inconsistent_snapset_wrapper &snap_error);
1448
1449 public:
1450 coll_t get_coll() {
1451 return coll;
1452 }
1453 void split_colls(
1454 spg_t child,
1455 int split_bits,
1456 int seed,
1457 const pg_pool_t *pool,
1458 ObjectStore::Transaction *t) override {
1459 coll_t target = coll_t(child);
1460 PG::_create(*t, child, split_bits);
1461 t->split_collection(
1462 coll,
1463 split_bits,
1464 seed,
1465 target);
1466 PG::_init(*t, child, pool);
1467 }
1468 private:
1469
1470 struct DoSnapWork : boost::statechart::event< DoSnapWork > {
1471 DoSnapWork() : boost::statechart::event < DoSnapWork >() {}
1472 };
1473 struct KickTrim : boost::statechart::event< KickTrim > {
1474 KickTrim() : boost::statechart::event < KickTrim >() {}
1475 };
1476 struct RepopsComplete : boost::statechart::event< RepopsComplete > {
1477 RepopsComplete() : boost::statechart::event < RepopsComplete >() {}
1478 };
1479 struct ScrubComplete : boost::statechart::event< ScrubComplete > {
1480 ScrubComplete() : boost::statechart::event < ScrubComplete >() {}
1481 };
1482 struct TrimWriteUnblocked : boost::statechart::event< TrimWriteUnblocked > {
1483 TrimWriteUnblocked() : boost::statechart::event < TrimWriteUnblocked >() {}
1484 };
1485 struct Reset : boost::statechart::event< Reset > {
1486 Reset() : boost::statechart::event< Reset >() {}
1487 };
1488 struct SnapTrimReserved : boost::statechart::event< SnapTrimReserved > {
1489 SnapTrimReserved() : boost::statechart::event< SnapTrimReserved >() {}
1490 };
1491 struct SnapTrimTimerReady : boost::statechart::event< SnapTrimTimerReady > {
1492 SnapTrimTimerReady() : boost::statechart::event< SnapTrimTimerReady >() {}
1493 };
1494
1495 struct NotTrimming;
1496 struct SnapTrimmer : public boost::statechart::state_machine< SnapTrimmer, NotTrimming > {
1497 PrimaryLogPG *pg;
1498 explicit SnapTrimmer(PrimaryLogPG *pg) : pg(pg) {}
1499 void log_enter(const char *state_name);
1500 void log_exit(const char *state_name, utime_t duration);
1501 bool can_trim() {
1502 return pg->is_clean() && !pg->scrubber.active && !pg->snap_trimq.empty();
1503 }
1504 } snap_trimmer_machine;
1505
1506 struct WaitReservation;
1507 struct Trimming : boost::statechart::state< Trimming, SnapTrimmer, WaitReservation >, NamedState {
1508 typedef boost::mpl::list <
1509 boost::statechart::custom_reaction< KickTrim >,
1510 boost::statechart::transition< Reset, NotTrimming >
1511 > reactions;
1512
1513 set<hobject_t> in_flight;
1514 snapid_t snap_to_trim;
1515
1516 explicit Trimming(my_context ctx)
1517 : my_base(ctx),
1518 NamedState(context< SnapTrimmer >().pg, "Trimming") {
1519 context< SnapTrimmer >().log_enter(state_name);
1520 assert(context< SnapTrimmer >().can_trim());
1521 assert(in_flight.empty());
1522 }
1523 void exit() {
1524 context< SnapTrimmer >().log_exit(state_name, enter_time);
1525 auto *pg = context< SnapTrimmer >().pg;
1526 pg->osd->snap_reserver.cancel_reservation(pg->get_pgid());
1527 pg->state_clear(PG_STATE_SNAPTRIM);
1528 pg->publish_stats_to_osd();
1529 }
1530 boost::statechart::result react(const KickTrim&) {
1531 return discard_event();
1532 }
1533 };
1534
1535 /* SnapTrimmerStates */
1536 struct WaitTrimTimer : boost::statechart::state< WaitTrimTimer, Trimming >, NamedState {
1537 typedef boost::mpl::list <
1538 boost::statechart::custom_reaction< SnapTrimTimerReady >
1539 > reactions;
1540 Context *wakeup = nullptr;
1541 explicit WaitTrimTimer(my_context ctx)
1542 : my_base(ctx),
1543 NamedState(context< SnapTrimmer >().pg, "Trimming/WaitTrimTimer") {
1544 context< SnapTrimmer >().log_enter(state_name);
1545 assert(context<Trimming>().in_flight.empty());
1546 struct OnTimer : Context {
1547 PrimaryLogPGRef pg;
1548 epoch_t epoch;
1549 OnTimer(PrimaryLogPGRef pg, epoch_t epoch) : pg(pg), epoch(epoch) {}
1550 void finish(int) override {
1551 pg->lock();
1552 if (!pg->pg_has_reset_since(epoch))
1553 pg->snap_trimmer_machine.process_event(SnapTrimTimerReady());
1554 pg->unlock();
1555 }
1556 };
1557 auto *pg = context< SnapTrimmer >().pg;
1558 if (pg->cct->_conf->osd_snap_trim_sleep > 0) {
1559 Mutex::Locker l(pg->osd->snap_sleep_lock);
1560 wakeup = pg->osd->snap_sleep_timer.add_event_after(
1561 pg->cct->_conf->osd_snap_trim_sleep,
1562 new OnTimer{pg, pg->get_osdmap()->get_epoch()});
1563 } else {
1564 post_event(SnapTrimTimerReady());
1565 }
1566 }
1567 void exit() {
1568 context< SnapTrimmer >().log_exit(state_name, enter_time);
1569 auto *pg = context< SnapTrimmer >().pg;
1570 if (wakeup) {
1571 Mutex::Locker l(pg->osd->snap_sleep_lock);
1572 pg->osd->snap_sleep_timer.cancel_event(wakeup);
1573 wakeup = nullptr;
1574 }
1575 }
1576 boost::statechart::result react(const SnapTrimTimerReady &) {
1577 wakeup = nullptr;
1578 if (!context< SnapTrimmer >().can_trim()) {
1579 post_event(KickTrim());
1580 return transit< NotTrimming >();
1581 } else {
1582 return transit< AwaitAsyncWork >();
1583 }
1584 }
1585 };
1586
1587 struct WaitRWLock : boost::statechart::state< WaitRWLock, Trimming >, NamedState {
1588 typedef boost::mpl::list <
1589 boost::statechart::custom_reaction< TrimWriteUnblocked >
1590 > reactions;
1591 explicit WaitRWLock(my_context ctx)
1592 : my_base(ctx),
1593 NamedState(context< SnapTrimmer >().pg, "Trimming/WaitRWLock") {
1594 context< SnapTrimmer >().log_enter(state_name);
1595 assert(context<Trimming>().in_flight.empty());
1596 }
1597 void exit() {
1598 context< SnapTrimmer >().log_exit(state_name, enter_time);
1599 }
1600 boost::statechart::result react(const TrimWriteUnblocked&) {
1601 if (!context< SnapTrimmer >().can_trim()) {
1602 post_event(KickTrim());
1603 return transit< NotTrimming >();
1604 } else {
1605 return transit< AwaitAsyncWork >();
1606 }
1607 }
1608 };
1609
1610 struct WaitRepops : boost::statechart::state< WaitRepops, Trimming >, NamedState {
1611 typedef boost::mpl::list <
1612 boost::statechart::custom_reaction< RepopsComplete >
1613 > reactions;
1614 explicit WaitRepops(my_context ctx)
1615 : my_base(ctx),
1616 NamedState(context< SnapTrimmer >().pg, "Trimming/WaitRepops") {
1617 context< SnapTrimmer >().log_enter(state_name);
1618 assert(!context<Trimming>().in_flight.empty());
1619 }
1620 void exit() {
1621 context< SnapTrimmer >().log_exit(state_name, enter_time);
1622 }
1623 boost::statechart::result react(const RepopsComplete&) {
1624 if (!context< SnapTrimmer >().can_trim()) {
1625 post_event(KickTrim());
1626 return transit< NotTrimming >();
1627 } else {
1628 return transit< WaitTrimTimer >();
1629 }
1630 }
1631 };
1632
1633 struct AwaitAsyncWork : boost::statechart::state< AwaitAsyncWork, Trimming >, NamedState {
1634 typedef boost::mpl::list <
1635 boost::statechart::custom_reaction< DoSnapWork >
1636 > reactions;
1637 explicit AwaitAsyncWork(my_context ctx);
1638 void exit() {
1639 context< SnapTrimmer >().log_exit(state_name, enter_time);
1640 }
1641 boost::statechart::result react(const DoSnapWork&);
1642 };
1643
1644 struct WaitReservation : boost::statechart::state< WaitReservation, Trimming >, NamedState {
1645 /* WaitReservation is a sub-state of trimming simply so that exiting Trimming
1646 * always cancels the reservation */
1647 typedef boost::mpl::list <
1648 boost::statechart::custom_reaction< SnapTrimReserved >
1649 > reactions;
1650 struct ReservationCB : public Context {
1651 PrimaryLogPGRef pg;
1652 bool canceled;
1653 ReservationCB(PrimaryLogPG *pg) : pg(pg), canceled(false) {}
1654 void finish(int) override {
1655 pg->lock();
1656 if (!canceled)
1657 pg->snap_trimmer_machine.process_event(SnapTrimReserved());
1658 pg->unlock();
1659 }
1660 void cancel() {
1661 assert(pg->is_locked());
1662 assert(!canceled);
1663 canceled = true;
1664 }
1665 };
1666 ReservationCB *pending = nullptr;
1667
1668 explicit WaitReservation(my_context ctx)
1669 : my_base(ctx),
1670 NamedState(context< SnapTrimmer >().pg, "Trimming/WaitReservation") {
1671 context< SnapTrimmer >().log_enter(state_name);
1672 assert(context<Trimming>().in_flight.empty());
1673 auto *pg = context< SnapTrimmer >().pg;
1674 pending = new ReservationCB(pg);
1675 pg->osd->snap_reserver.request_reservation(
1676 pg->get_pgid(),
1677 pending,
1678 0);
1679 pg->state_set(PG_STATE_SNAPTRIM_WAIT);
1680 pg->publish_stats_to_osd();
1681 }
1682 boost::statechart::result react(const SnapTrimReserved&);
1683 void exit() {
1684 context< SnapTrimmer >().log_exit(state_name, enter_time);
1685 if (pending)
1686 pending->cancel();
1687 pending = nullptr;
1688 auto *pg = context< SnapTrimmer >().pg;
1689 pg->state_clear(PG_STATE_SNAPTRIM_WAIT);
1690 pg->state_clear(PG_STATE_SNAPTRIM_ERROR);
1691 pg->publish_stats_to_osd();
1692 }
1693 };
1694
1695 struct WaitScrub : boost::statechart::state< WaitScrub, SnapTrimmer >, NamedState {
1696 typedef boost::mpl::list <
1697 boost::statechart::custom_reaction< ScrubComplete >,
1698 boost::statechart::custom_reaction< KickTrim >,
1699 boost::statechart::transition< Reset, NotTrimming >
1700 > reactions;
1701 explicit WaitScrub(my_context ctx)
1702 : my_base(ctx),
1703 NamedState(context< SnapTrimmer >().pg, "Trimming/WaitScrub") {
1704 context< SnapTrimmer >().log_enter(state_name);
1705 }
1706 void exit() {
1707 context< SnapTrimmer >().log_exit(state_name, enter_time);
1708 }
1709 boost::statechart::result react(const ScrubComplete&) {
1710 post_event(KickTrim());
1711 return transit< NotTrimming >();
1712 }
1713 boost::statechart::result react(const KickTrim&) {
1714 return discard_event();
1715 }
1716 };
1717
1718 struct NotTrimming : boost::statechart::state< NotTrimming, SnapTrimmer >, NamedState {
1719 typedef boost::mpl::list <
1720 boost::statechart::custom_reaction< KickTrim >,
1721 boost::statechart::transition< Reset, NotTrimming >
1722 > reactions;
1723 explicit NotTrimming(my_context ctx);
1724 void exit();
1725 boost::statechart::result react(const KickTrim&);
1726 };
1727
1728 int _verify_no_head_clones(const hobject_t& soid,
1729 const SnapSet& ss);
1730 // return true if we're creating a local object, false for a
1731 // whiteout or no change.
1732 void maybe_create_new_object(OpContext *ctx, bool ignore_transaction=false);
1733 int _delete_oid(OpContext *ctx, bool no_whiteout, bool try_no_whiteout);
1734 int _rollback_to(OpContext *ctx, ceph_osd_op& op);
1735 public:
1736 bool is_missing_object(const hobject_t& oid) const;
1737 bool is_unreadable_object(const hobject_t &oid) const {
1738 return is_missing_object(oid) ||
1739 !missing_loc.readable_with_acting(oid, actingset);
1740 }
1741 void maybe_kick_recovery(const hobject_t &soid);
1742 void wait_for_unreadable_object(const hobject_t& oid, OpRequestRef op);
1743 void wait_for_all_missing(OpRequestRef op);
1744
1745 bool is_degraded_or_backfilling_object(const hobject_t& oid);
1746 void wait_for_degraded_object(const hobject_t& oid, OpRequestRef op);
1747
1748 void block_write_on_full_cache(
1749 const hobject_t& oid, OpRequestRef op);
1750 void block_for_clean(
1751 const hobject_t& oid, OpRequestRef op);
1752 void block_write_on_snap_rollback(
1753 const hobject_t& oid, ObjectContextRef obc, OpRequestRef op);
1754 void block_write_on_degraded_snap(const hobject_t& oid, OpRequestRef op);
1755
1756 bool maybe_await_blocked_snapset(const hobject_t &soid, OpRequestRef op);
1757 void wait_for_blocked_object(const hobject_t& soid, OpRequestRef op);
1758 void kick_object_context_blocked(ObjectContextRef obc);
1759
1760 void maybe_force_recovery();
1761
1762 void mark_all_unfound_lost(
1763 int what,
1764 ConnectionRef con,
1765 ceph_tid_t tid);
1766 eversion_t pick_newest_available(const hobject_t& oid);
1767
1768 void do_update_log_missing(
1769 OpRequestRef &op);
1770
1771 void do_update_log_missing_reply(
1772 OpRequestRef &op);
1773
1774 void on_role_change() override;
1775 void on_pool_change() override;
1776 void _on_new_interval() override;
1777 void clear_async_reads();
1778 void on_change(ObjectStore::Transaction *t) override;
1779 void on_activate() override;
1780 void on_flushed() override;
1781 void on_removal(ObjectStore::Transaction *t) override;
1782 void on_shutdown() override;
1783 bool check_failsafe_full(ostream &ss) override;
1784 bool check_osdmap_full(const set<pg_shard_t> &missing_on) override;
1785 int rep_repair_primary_object(const hobject_t& soid, OpRequestRef op);
1786
1787 // attr cache handling
1788 void setattr_maybe_cache(
1789 ObjectContextRef obc,
1790 OpContext *op,
1791 PGTransaction *t,
1792 const string &key,
1793 bufferlist &val);
1794 void setattrs_maybe_cache(
1795 ObjectContextRef obc,
1796 OpContext *op,
1797 PGTransaction *t,
1798 map<string, bufferlist> &attrs);
1799 void rmattr_maybe_cache(
1800 ObjectContextRef obc,
1801 OpContext *op,
1802 PGTransaction *t,
1803 const string &key);
1804 int getattr_maybe_cache(
1805 ObjectContextRef obc,
1806 const string &key,
1807 bufferlist *val);
1808 int getattrs_maybe_cache(
1809 ObjectContextRef obc,
1810 map<string, bufferlist> *out);
1811 };
1812
1813 inline ostream& operator<<(ostream& out, const PrimaryLogPG::RepGather& repop)
1814 {
1815 out << "repgather(" << &repop
1816 << " " << repop.v
1817 << " rep_tid=" << repop.rep_tid
1818 << " committed?=" << repop.all_committed
1819 << " applied?=" << repop.all_applied
1820 << " r=" << repop.r
1821 << ")";
1822 return out;
1823 }
1824
1825 inline ostream& operator<<(ostream& out,
1826 const PrimaryLogPG::ProxyWriteOpRef& pwop)
1827 {
1828 out << "proxywrite(" << &pwop
1829 << " " << pwop->user_version
1830 << " pwop_tid=" << pwop->objecter_tid;
1831 if (pwop->ctx->op)
1832 out << " op=" << *(pwop->ctx->op->get_req());
1833 out << ")";
1834 return out;
1835 }
1836
1837 void intrusive_ptr_add_ref(PrimaryLogPG::RepGather *repop);
1838 void intrusive_ptr_release(PrimaryLogPG::RepGather *repop);
1839
1840
1841 #endif