]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/PrimaryLogPG.h
bump version to 12.2.1-pve3
[ceph.git] / ceph / src / osd / PrimaryLogPG.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2/*
3 * Ceph - scalable distributed file system
4 *
5 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
6 * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
7 *
8 * Author: Loic Dachary <loic@dachary.org>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#ifndef CEPH_REPLICATEDPG_H
18#define CEPH_REPLICATEDPG_H
19
20#include <boost/tuple/tuple.hpp>
21#include "include/assert.h"
22#include "PG.h"
23#include "Watch.h"
24#include "TierAgentState.h"
25#include "messages/MOSDOpReply.h"
26#include "common/Checksummer.h"
27#include "common/sharedptr_registry.hpp"
28#include "ReplicatedBackend.h"
29#include "PGTransaction.h"
30
31class CopyFromCallback;
32class PromoteCallback;
33
34class PrimaryLogPG;
35class PGLSFilter;
36class HitSet;
37struct TierAgentState;
38class MOSDOp;
39class MOSDOpReply;
40class OSDService;
41
42void intrusive_ptr_add_ref(PrimaryLogPG *pg);
43void intrusive_ptr_release(PrimaryLogPG *pg);
44uint64_t get_with_id(PrimaryLogPG *pg);
45void put_with_id(PrimaryLogPG *pg, uint64_t id);
46
47#ifdef PG_DEBUG_REFS
48 typedef TrackedIntPtr<PrimaryLogPG> PrimaryLogPGRef;
49#else
50 typedef boost::intrusive_ptr<PrimaryLogPG> PrimaryLogPGRef;
51#endif
52
53struct inconsistent_snapset_wrapper;
54
55class PrimaryLogPG : public PG, public PGBackend::Listener {
56 friend class OSD;
57 friend class Watch;
58
59public:
60 MEMPOOL_CLASS_HELPERS();
61
62 /*
63 * state associated with a copy operation
64 */
65 struct OpContext;
66 class CopyCallback;
67
68 /**
69 * CopyResults stores the object metadata of interest to a copy initiator.
70 */
71 struct CopyResults {
72 ceph::real_time mtime; ///< the copy source's mtime
73 uint64_t object_size; ///< the copied object's size
74 bool started_temp_obj; ///< true if the callback needs to delete temp object
75 hobject_t temp_oid; ///< temp object (if any)
76
77 /**
78 * Function to fill in transaction; if non-empty the callback
79 * must execute it before any other accesses to the object
80 * (in order to complete the copy).
81 */
82 std::function<void(PGTransaction *)> fill_in_final_tx;
83
84 version_t user_version; ///< The copy source's user version
85 bool should_requeue; ///< op should be requeued on cancel
86 vector<snapid_t> snaps; ///< src's snaps (if clone)
87 snapid_t snap_seq; ///< src's snap_seq (if head)
88 librados::snap_set_t snapset; ///< src snapset (if head)
89 bool mirror_snapset;
90 bool has_omap;
91 uint32_t flags; // object_copy_data_t::FLAG_*
92 uint32_t source_data_digest, source_omap_digest;
93 uint32_t data_digest, omap_digest;
31f18b77 94 mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > reqids; // [(reqid, user_version)]
7c673cae
FG
95 map<string, bufferlist> attrs; // xattrs
96 uint64_t truncate_seq;
97 uint64_t truncate_size;
98 bool is_data_digest() {
99 return flags & object_copy_data_t::FLAG_DATA_DIGEST;
100 }
101 bool is_omap_digest() {
102 return flags & object_copy_data_t::FLAG_OMAP_DIGEST;
103 }
104 CopyResults()
105 : object_size(0), started_temp_obj(false),
106 user_version(0),
107 should_requeue(false), mirror_snapset(false),
108 has_omap(false),
109 flags(0),
110 source_data_digest(-1), source_omap_digest(-1),
111 data_digest(-1), omap_digest(-1),
112 truncate_seq(0), truncate_size(0)
113 {}
114 };
115
116 struct CopyOp {
117 CopyCallback *cb;
118 ObjectContextRef obc;
119 hobject_t src;
120 object_locator_t oloc;
121 unsigned flags;
122 bool mirror_snapset;
123
124 CopyResults results;
125
126 ceph_tid_t objecter_tid;
127 ceph_tid_t objecter_tid2;
128
129 object_copy_cursor_t cursor;
130 map<string,bufferlist> attrs;
131 bufferlist data;
132 bufferlist omap_header;
133 bufferlist omap_data;
134 int rval;
135
136 object_copy_cursor_t temp_cursor;
137
138 /*
139 * For CopyOp the process is:
140 * step1: read the data(attr/omap/data) from the source object
141 * step2: handle those data(w/ those data create a new object)
142 * src_obj_fadvise_flags used in step1;
143 * dest_obj_fadvise_flags used in step2
144 */
145 unsigned src_obj_fadvise_flags;
146 unsigned dest_obj_fadvise_flags;
147
148 CopyOp(CopyCallback *cb_, ObjectContextRef _obc, hobject_t s,
149 object_locator_t l,
150 version_t v,
151 unsigned f,
152 bool ms,
153 unsigned src_obj_fadvise_flags,
154 unsigned dest_obj_fadvise_flags)
155 : cb(cb_), obc(_obc), src(s), oloc(l), flags(f),
156 mirror_snapset(ms),
157 objecter_tid(0),
158 objecter_tid2(0),
159 rval(-1),
160 src_obj_fadvise_flags(src_obj_fadvise_flags),
161 dest_obj_fadvise_flags(dest_obj_fadvise_flags)
162 {
163 results.user_version = v;
164 results.mirror_snapset = mirror_snapset;
165 }
166 };
167 typedef ceph::shared_ptr<CopyOp> CopyOpRef;
168
169 /**
170 * The CopyCallback class defines an interface for completions to the
171 * copy_start code. Users of the copy infrastructure must implement
172 * one and give an instance of the class to start_copy.
173 *
174 * The implementer is responsible for making sure that the CopyCallback
175 * can associate itself with the correct copy operation.
176 */
177 typedef boost::tuple<int, CopyResults*> CopyCallbackResults;
178
179 friend class CopyFromCallback;
c07f9fc5 180 friend class CopyFromFinisher;
7c673cae
FG
181 friend class PromoteCallback;
182
183 struct ProxyReadOp {
184 OpRequestRef op;
185 hobject_t soid;
186 ceph_tid_t objecter_tid;
187 vector<OSDOp> &ops;
188 version_t user_version;
189 int data_offset;
190 bool canceled; ///< true if canceled
191
192 ProxyReadOp(OpRequestRef _op, hobject_t oid, vector<OSDOp>& _ops)
193 : op(_op), soid(oid),
194 objecter_tid(0), ops(_ops),
195 user_version(0), data_offset(0),
196 canceled(false) { }
197 };
198 typedef ceph::shared_ptr<ProxyReadOp> ProxyReadOpRef;
199
200 struct ProxyWriteOp {
201 OpContext *ctx;
202 OpRequestRef op;
203 hobject_t soid;
204 ceph_tid_t objecter_tid;
205 vector<OSDOp> &ops;
206 version_t user_version;
207 bool sent_reply;
208 utime_t mtime;
209 bool canceled;
210 osd_reqid_t reqid;
211
212 ProxyWriteOp(OpRequestRef _op, hobject_t oid, vector<OSDOp>& _ops, osd_reqid_t _reqid)
213 : ctx(NULL), op(_op), soid(oid),
214 objecter_tid(0), ops(_ops),
215 user_version(0), sent_reply(false),
216 canceled(false),
217 reqid(_reqid) { }
218 };
219 typedef ceph::shared_ptr<ProxyWriteOp> ProxyWriteOpRef;
220
221 struct FlushOp {
222 ObjectContextRef obc; ///< obc we are flushing
223 OpRequestRef op; ///< initiating op
224 list<OpRequestRef> dup_ops; ///< bandwagon jumpers
225 version_t flushed_version; ///< user version we are flushing
226 ceph_tid_t objecter_tid; ///< copy-from request tid
227 int rval; ///< copy-from result
228 bool blocking; ///< whether we are blocking updates
229 bool removal; ///< we are removing the backend object
230 boost::optional<std::function<void()>> on_flush; ///< callback, may be null
231
232 FlushOp()
233 : flushed_version(0), objecter_tid(0), rval(0),
234 blocking(false), removal(false) {}
235 ~FlushOp() { assert(!on_flush); }
236 };
237 typedef ceph::shared_ptr<FlushOp> FlushOpRef;
238
239 boost::scoped_ptr<PGBackend> pgbackend;
240 PGBackend *get_pgbackend() override {
241 return pgbackend.get();
242 }
243
244 /// Listener methods
245 DoutPrefixProvider *get_dpp() override {
246 return this;
247 }
248
249 void on_local_recover(
250 const hobject_t &oid,
251 const ObjectRecoveryInfo &recovery_info,
252 ObjectContextRef obc,
c07f9fc5 253 bool is_delete,
7c673cae
FG
254 ObjectStore::Transaction *t
255 ) override;
256 void on_peer_recover(
257 pg_shard_t peer,
258 const hobject_t &oid,
259 const ObjectRecoveryInfo &recovery_info
260 ) override;
261 void begin_peer_recover(
262 pg_shard_t peer,
263 const hobject_t oid) override;
264 void on_global_recover(
265 const hobject_t &oid,
c07f9fc5
FG
266 const object_stat_sum_t &stat_diff,
267 bool is_delete) override;
7c673cae 268 void failed_push(const list<pg_shard_t> &from, const hobject_t &soid) override;
224ce89b
WB
269 void primary_failed(const hobject_t &soid) override;
270 bool primary_error(const hobject_t& soid, eversion_t v) override;
7c673cae
FG
271 void cancel_pull(const hobject_t &soid) override;
272 void apply_stats(
273 const hobject_t &soid,
274 const object_stat_sum_t &delta_stats) override;
224ce89b 275 void on_primary_error(const hobject_t &oid, eversion_t v) override;
c07f9fc5
FG
276 void remove_missing_object(const hobject_t &oid,
277 eversion_t v,
278 Context *on_complete) override;
7c673cae
FG
279
280 template<class T> class BlessedGenContext;
281 class BlessedContext;
282 Context *bless_context(Context *c) override;
283
284 GenContext<ThreadPool::TPHandle&> *bless_gencontext(
285 GenContext<ThreadPool::TPHandle&> *c) override;
286
287 void send_message(int to_osd, Message *m) override {
288 osd->send_message_osd_cluster(to_osd, m, get_osdmap()->get_epoch());
289 }
290 void queue_transaction(ObjectStore::Transaction&& t,
291 OpRequestRef op) override {
292 osd->store->queue_transaction(osr.get(), std::move(t), 0, 0, 0, op);
293 }
294 void queue_transactions(vector<ObjectStore::Transaction>& tls,
295 OpRequestRef op) override {
296 osd->store->queue_transactions(osr.get(), tls, 0, 0, 0, op, NULL);
297 }
298 epoch_t get_epoch() const override {
299 return get_osdmap()->get_epoch();
300 }
301 epoch_t get_interval_start_epoch() const override {
302 return info.history.same_interval_since;
303 }
304 epoch_t get_last_peering_reset_epoch() const override {
305 return get_last_peering_reset();
306 }
307 const set<pg_shard_t> &get_actingbackfill_shards() const override {
308 return actingbackfill;
309 }
310 const set<pg_shard_t> &get_acting_shards() const override {
311 return actingset;
312 }
313 const set<pg_shard_t> &get_backfill_shards() const override {
314 return backfill_targets;
315 }
316
317 std::string gen_dbg_prefix() const override { return gen_prefix(); }
318
319 const map<hobject_t, set<pg_shard_t>>
320 &get_missing_loc_shards() const override {
321 return missing_loc.get_missing_locs();
322 }
323 const map<pg_shard_t, pg_missing_t> &get_shard_missing() const override {
324 return peer_missing;
325 }
326 using PGBackend::Listener::get_shard_missing;
327 const map<pg_shard_t, pg_info_t> &get_shard_info() const override {
328 return peer_info;
329 }
330 using PGBackend::Listener::get_shard_info;
331 const pg_missing_tracker_t &get_local_missing() const override {
332 return pg_log.get_missing();
333 }
334 const PGLog &get_log() const override {
335 return pg_log;
336 }
337 bool pgb_is_primary() const override {
338 return is_primary();
339 }
340 OSDMapRef pgb_get_osdmap() const override {
341 return get_osdmap();
342 }
343 const pg_info_t &get_info() const override {
344 return info;
345 }
346 const pg_pool_t &get_pool() const override {
347 return pool.info;
348 }
349
350 ObjectContextRef get_obc(
351 const hobject_t &hoid,
352 const map<string, bufferlist> &attrs) override {
353 return get_object_context(hoid, true, &attrs);
354 }
355
356 bool try_lock_for_read(
357 const hobject_t &hoid,
358 ObcLockManager &manager) override {
359 if (is_missing_object(hoid))
360 return false;
361 auto obc = get_object_context(hoid, false, nullptr);
362 if (!obc)
363 return false;
364 return manager.try_get_read_lock(hoid, obc);
365 }
366
367 void release_locks(ObcLockManager &manager) override {
368 release_object_locks(manager);
369 }
370
371 void pgb_set_object_snap_mapping(
372 const hobject_t &soid,
373 const set<snapid_t> &snaps,
374 ObjectStore::Transaction *t) override {
375 return update_object_snap_mapping(t, soid, snaps);
376 }
377 void pgb_clear_object_snap_mapping(
378 const hobject_t &soid,
379 ObjectStore::Transaction *t) override {
380 return clear_object_snap_mapping(t, soid);
381 }
382
383 void log_operation(
384 const vector<pg_log_entry_t> &logv,
385 const boost::optional<pg_hit_set_history_t> &hset_history,
386 const eversion_t &trim_to,
387 const eversion_t &roll_forward_to,
388 bool transaction_applied,
389 ObjectStore::Transaction &t) override {
390 if (hset_history) {
391 info.hit_set = *hset_history;
392 }
393 append_log(logv, trim_to, roll_forward_to, t, transaction_applied);
394 }
395
396 struct C_OSD_OnApplied;
397 void op_applied(
398 const eversion_t &applied_version) override;
399
400 bool should_send_op(
401 pg_shard_t peer,
402 const hobject_t &hoid) override {
403 if (peer == get_primary())
404 return true;
405 assert(peer_info.count(peer));
406 bool should_send =
407 hoid.pool != (int64_t)info.pgid.pool() ||
408 hoid <= last_backfill_started ||
409 hoid <= peer_info[peer].last_backfill;
410 if (!should_send)
411 assert(is_backfill_targets(peer));
412 return should_send;
413 }
414
415 void update_peer_last_complete_ondisk(
416 pg_shard_t fromosd,
417 eversion_t lcod) override {
418 peer_last_complete_ondisk[fromosd] = lcod;
419 }
420
421 void update_last_complete_ondisk(
422 eversion_t lcod) override {
423 last_complete_ondisk = lcod;
424 }
425
426 void update_stats(
427 const pg_stat_t &stat) override {
428 info.stats = stat;
429 }
430
431 void schedule_recovery_work(
432 GenContext<ThreadPool::TPHandle&> *c) override;
433
434 pg_shard_t whoami_shard() const override {
435 return pg_whoami;
436 }
437 spg_t primary_spg_t() const override {
438 return spg_t(info.pgid.pgid, primary.shard);
439 }
440 pg_shard_t primary_shard() const override {
441 return primary;
442 }
443 uint64_t min_peer_features() const override {
444 return get_min_peer_features();
445 }
446
447 void send_message_osd_cluster(
448 int peer, Message *m, epoch_t from_epoch) override;
449 void send_message_osd_cluster(
450 Message *m, Connection *con) override;
451 void send_message_osd_cluster(
452 Message *m, const ConnectionRef& con) override;
453 ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch) override;
454 entity_name_t get_cluster_msgr_name() override {
455 return osd->get_cluster_msgr_name();
456 }
457
458 PerfCounters *get_logger() override;
459
460 ceph_tid_t get_tid() override { return osd->get_tid(); }
461
462 LogClientTemp clog_error() override { return osd->clog->error(); }
c07f9fc5 463 LogClientTemp clog_warn() override { return osd->clog->warn(); }
7c673cae
FG
464
465 struct watch_disconnect_t {
466 uint64_t cookie;
467 entity_name_t name;
468 bool send_disconnect;
469 watch_disconnect_t(uint64_t c, entity_name_t n, bool sd)
470 : cookie(c), name(n), send_disconnect(sd) {}
471 };
472 void complete_disconnect_watches(
473 ObjectContextRef obc,
474 const list<watch_disconnect_t> &to_disconnect);
475
c07f9fc5
FG
476 struct OpFinisher {
477 virtual ~OpFinisher() {
478 }
479
480 virtual int execute() = 0;
481 };
482
7c673cae
FG
483 /*
484 * Capture all object state associated with an in-progress read or write.
485 */
486 struct OpContext {
487 OpRequestRef op;
488 osd_reqid_t reqid;
c07f9fc5 489 vector<OSDOp> *ops;
7c673cae
FG
490
491 const ObjectState *obs; // Old objectstate
492 const SnapSet *snapset; // Old snapset
493
494 ObjectState new_obs; // resulting ObjectState
495 SnapSet new_snapset; // resulting SnapSet (in case of a write)
496 //pg_stat_t new_stats; // resulting Stats
497 object_stat_sum_t delta_stats;
498
499 bool modify; // (force) modification (even if op_t is empty)
500 bool user_modify; // user-visible modification
501 bool undirty; // user explicitly un-dirtying this object
502 bool cache_evict; ///< true if this is a cache eviction
503 bool ignore_cache; ///< true if IGNORE_CACHE flag is set
504 bool ignore_log_op_stats; // don't log op stats
505 bool update_log_only; ///< this is a write that returned an error - just record in pg log for dup detection
506
507 // side effects
508 list<pair<watch_info_t,bool> > watch_connects; ///< new watch + will_ping flag
509 list<watch_disconnect_t> watch_disconnects; ///< old watch + send_discon
510 list<notify_info_t> notifies;
511 struct NotifyAck {
512 boost::optional<uint64_t> watch_cookie;
513 uint64_t notify_id;
514 bufferlist reply_bl;
515 explicit NotifyAck(uint64_t notify_id) : notify_id(notify_id) {}
516 NotifyAck(uint64_t notify_id, uint64_t cookie, bufferlist& rbl)
517 : watch_cookie(cookie), notify_id(notify_id) {
518 reply_bl.claim(rbl);
519 }
520 };
521 list<NotifyAck> notify_acks;
522
523 uint64_t bytes_written, bytes_read;
524
525 utime_t mtime;
526 SnapContext snapc; // writer snap context
527 eversion_t at_version; // pg's current version pointer
528 version_t user_at_version; // pg's current user version pointer
529
530 int current_osd_subop_num;
531
532 PGTransactionUPtr op_t;
533 vector<pg_log_entry_t> log;
534 boost::optional<pg_hit_set_history_t> updated_hset_history;
535
536 interval_set<uint64_t> modified_ranges;
537 ObjectContextRef obc;
538 ObjectContextRef clone_obc; // if we created a clone
539 ObjectContextRef snapset_obc; // if we created/deleted a snapdir
540
c07f9fc5
FG
541 // FIXME: we may want to kill this msgr hint off at some point!
542 boost::optional<int> data_off = boost::none;
7c673cae
FG
543
544 MOSDOpReply *reply;
545
546 PrimaryLogPG *pg;
547
548 int num_read; ///< count read ops
549 int num_write; ///< count update ops
550
31f18b77 551 mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > extra_reqids;
7c673cae 552
7c673cae
FG
553 hobject_t new_temp_oid, discard_temp_oid; ///< temp objects we should start/stop tracking
554
555 list<std::function<void()>> on_applied;
556 list<std::function<void()>> on_committed;
557 list<std::function<void()>> on_finish;
558 list<std::function<void()>> on_success;
559 template <typename F>
560 void register_on_finish(F &&f) {
561 on_finish.emplace_back(std::forward<F>(f));
562 }
563 template <typename F>
564 void register_on_success(F &&f) {
565 on_success.emplace_back(std::forward<F>(f));
566 }
567 template <typename F>
568 void register_on_applied(F &&f) {
569 on_applied.emplace_back(std::forward<F>(f));
570 }
571 template <typename F>
572 void register_on_commit(F &&f) {
573 on_committed.emplace_back(std::forward<F>(f));
574 }
575
576 bool sent_reply;
577
578 // pending async reads <off, len, op_flags> -> <outbl, outr>
579 list<pair<boost::tuple<uint64_t, uint64_t, unsigned>,
580 pair<bufferlist*, Context*> > > pending_async_reads;
7c673cae
FG
581 int inflightreads;
582 friend struct OnReadComplete;
583 void start_async_reads(PrimaryLogPG *pg);
584 void finish_read(PrimaryLogPG *pg);
585 bool async_reads_complete() {
586 return inflightreads == 0;
587 }
588
589 ObjectContext::RWState::State lock_type;
590 ObcLockManager lock_manager;
591
c07f9fc5
FG
592 std::map<int, std::unique_ptr<OpFinisher>> op_finishers;
593
7c673cae
FG
594 OpContext(const OpContext& other);
595 const OpContext& operator=(const OpContext& other);
596
c07f9fc5 597 OpContext(OpRequestRef _op, osd_reqid_t _reqid, vector<OSDOp>* _ops,
7c673cae
FG
598 ObjectContextRef& obc,
599 PrimaryLogPG *_pg) :
600 op(_op), reqid(_reqid), ops(_ops),
601 obs(&obc->obs),
602 snapset(0),
603 new_obs(obs->oi, obs->exists),
604 modify(false), user_modify(false), undirty(false), cache_evict(false),
605 ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
606 bytes_written(0), bytes_read(0), user_at_version(0),
607 current_osd_subop_num(0),
608 obc(obc),
c07f9fc5 609 reply(NULL), pg(_pg),
7c673cae
FG
610 num_read(0),
611 num_write(0),
7c673cae 612 sent_reply(false),
7c673cae
FG
613 inflightreads(0),
614 lock_type(ObjectContext::RWState::RWNONE) {
615 if (obc->ssc) {
616 new_snapset = obc->ssc->snapset;
617 snapset = &obc->ssc->snapset;
618 }
619 }
620 OpContext(OpRequestRef _op, osd_reqid_t _reqid,
c07f9fc5 621 vector<OSDOp>* _ops, PrimaryLogPG *_pg) :
7c673cae
FG
622 op(_op), reqid(_reqid), ops(_ops), obs(NULL), snapset(0),
623 modify(false), user_modify(false), undirty(false), cache_evict(false),
624 ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
625 bytes_written(0), bytes_read(0), user_at_version(0),
626 current_osd_subop_num(0),
c07f9fc5 627 reply(NULL), pg(_pg),
7c673cae
FG
628 num_read(0),
629 num_write(0),
7c673cae
FG
630 inflightreads(0),
631 lock_type(ObjectContext::RWState::RWNONE) {}
632 void reset_obs(ObjectContextRef obc) {
633 new_obs = ObjectState(obc->obs.oi, obc->obs.exists);
634 if (obc->ssc) {
635 new_snapset = obc->ssc->snapset;
636 snapset = &obc->ssc->snapset;
637 }
638 }
639 ~OpContext() {
640 assert(!op_t);
641 if (reply)
642 reply->put();
643 for (list<pair<boost::tuple<uint64_t, uint64_t, unsigned>,
644 pair<bufferlist*, Context*> > >::iterator i =
645 pending_async_reads.begin();
646 i != pending_async_reads.end();
647 pending_async_reads.erase(i++)) {
648 delete i->second.second;
649 }
650 }
651 uint64_t get_features() {
652 if (op && op->get_req()) {
653 return op->get_req()->get_connection()->get_features();
654 }
655 return -1ull;
656 }
657 };
658 using OpContextUPtr = std::unique_ptr<OpContext>;
659 friend struct OpContext;
660
661 /*
662 * State on the PG primary associated with the replicated mutation
663 */
664 class RepGather {
665 public:
666 hobject_t hoid;
667 OpRequestRef op;
668 xlist<RepGather*>::item queue_item;
669 int nref;
670
671 eversion_t v;
672 int r = 0;
673
674 ceph_tid_t rep_tid;
675
676 bool rep_aborted, rep_done;
677
678 bool all_applied;
679 bool all_committed;
680 const bool applies_with_commit;
681
682 utime_t start;
683
684 eversion_t pg_local_last_complete;
685
686 ObcLockManager lock_manager;
687
688 list<std::function<void()>> on_applied;
689 list<std::function<void()>> on_committed;
690 list<std::function<void()>> on_success;
691 list<std::function<void()>> on_finish;
692
693 RepGather(
694 OpContext *c, ceph_tid_t rt,
695 eversion_t lc,
696 bool applies_with_commit) :
697 hoid(c->obc->obs.oi.soid),
698 op(c->op),
699 queue_item(this),
700 nref(1),
701 rep_tid(rt),
702 rep_aborted(false), rep_done(false),
703 all_applied(false), all_committed(false),
704 applies_with_commit(applies_with_commit),
705 pg_local_last_complete(lc),
706 lock_manager(std::move(c->lock_manager)),
707 on_applied(std::move(c->on_applied)),
708 on_committed(std::move(c->on_committed)),
709 on_success(std::move(c->on_success)),
710 on_finish(std::move(c->on_finish)) {}
711
712 RepGather(
713 ObcLockManager &&manager,
714 OpRequestRef &&o,
715 boost::optional<std::function<void(void)> > &&on_complete,
716 ceph_tid_t rt,
717 eversion_t lc,
718 bool applies_with_commit,
719 int r) :
720 op(o),
721 queue_item(this),
722 nref(1),
723 r(r),
724 rep_tid(rt),
725 rep_aborted(false), rep_done(false),
726 all_applied(false), all_committed(false),
727 applies_with_commit(applies_with_commit),
728 pg_local_last_complete(lc),
729 lock_manager(std::move(manager)) {
730 if (on_complete) {
731 on_success.push_back(std::move(*on_complete));
732 }
733 }
734
735 RepGather *get() {
736 nref++;
737 return this;
738 }
739 void put() {
740 assert(nref > 0);
741 if (--nref == 0) {
742 assert(on_applied.empty());
743 delete this;
744 //generic_dout(0) << "deleting " << this << dendl;
745 }
746 }
747 };
748
749
750protected:
751
752 /**
753 * Grabs locks for OpContext, should be cleaned up in close_op_ctx
754 *
755 * @param ctx [in,out] ctx to get locks for
756 * @return true on success, false if we are queued
757 */
758 bool get_rw_locks(bool write_ordered, OpContext *ctx) {
759 /* If snapset_obc, !obc->obs->exists and we will always take the
760 * snapdir lock *before* the head lock. Since all callers will do
761 * this (read or write) if we get the first we will be guaranteed
762 * to get the second.
763 */
764 if (write_ordered && ctx->op->may_read()) {
765 ctx->lock_type = ObjectContext::RWState::RWEXCL;
766 } else if (write_ordered) {
767 ctx->lock_type = ObjectContext::RWState::RWWRITE;
768 } else {
769 assert(ctx->op->may_read());
770 ctx->lock_type = ObjectContext::RWState::RWREAD;
771 }
772
773 if (ctx->snapset_obc) {
774 assert(!ctx->obc->obs.exists);
775 if (!ctx->lock_manager.get_lock_type(
776 ctx->lock_type,
777 ctx->snapset_obc->obs.oi.soid,
778 ctx->snapset_obc,
779 ctx->op)) {
780 ctx->lock_type = ObjectContext::RWState::RWNONE;
781 return false;
782 }
783 }
784 if (ctx->lock_manager.get_lock_type(
785 ctx->lock_type,
786 ctx->obc->obs.oi.soid,
787 ctx->obc,
788 ctx->op)) {
789 return true;
790 } else {
791 assert(!ctx->snapset_obc);
792 ctx->lock_type = ObjectContext::RWState::RWNONE;
793 return false;
794 }
795 }
796
797 /**
798 * Cleans up OpContext
799 *
800 * @param ctx [in] ctx to clean up
801 */
c07f9fc5 802 void close_op_ctx(OpContext *ctx);
7c673cae
FG
803
804 /**
805 * Releases locks
806 *
807 * @param manager [in] manager with locks to release
808 */
809 void release_object_locks(
810 ObcLockManager &lock_manager) {
811 list<pair<hobject_t, list<OpRequestRef> > > to_req;
812 bool requeue_recovery = false;
813 bool requeue_snaptrim = false;
814 lock_manager.put_locks(
815 &to_req,
816 &requeue_recovery,
817 &requeue_snaptrim);
818 if (requeue_recovery)
819 queue_recovery();
820 if (requeue_snaptrim)
821 snap_trimmer_machine.process_event(TrimWriteUnblocked());
822
823 if (!to_req.empty()) {
824 // requeue at front of scrub blocking queue if we are blocked by scrub
825 for (auto &&p: to_req) {
826 if (scrubber.write_blocked_by_scrub(p.first.get_head())) {
827 waiting_for_scrub.splice(
828 waiting_for_scrub.begin(),
829 p.second,
830 p.second.begin(),
831 p.second.end());
832 } else {
833 requeue_ops(p.second);
834 }
835 }
836 }
837 }
838
839 // replica ops
840 // [primary|tail]
841 xlist<RepGather*> repop_queue;
842
843 friend class C_OSD_RepopApplied;
844 friend class C_OSD_RepopCommit;
845 void repop_all_applied(RepGather *repop);
846 void repop_all_committed(RepGather *repop);
847 void eval_repop(RepGather*);
848 void issue_repop(RepGather *repop, OpContext *ctx);
849 RepGather *new_repop(
850 OpContext *ctx,
851 ObjectContextRef obc,
852 ceph_tid_t rep_tid);
853 boost::intrusive_ptr<RepGather> new_repop(
854 eversion_t version,
855 int r,
856 ObcLockManager &&manager,
857 OpRequestRef &&op,
858 boost::optional<std::function<void(void)> > &&on_complete);
859 void remove_repop(RepGather *repop);
860
861 OpContextUPtr simple_opc_create(ObjectContextRef obc);
862 void simple_opc_submit(OpContextUPtr ctx);
863
864 /**
865 * Merge entries atomically into all actingbackfill osds
866 * adjusting missing and recovery state as necessary.
867 *
868 * Also used to store error log entries for dup detection.
869 */
870 void submit_log_entries(
31f18b77 871 const mempool::osd_pglog::list<pg_log_entry_t> &entries,
7c673cae
FG
872 ObcLockManager &&manager,
873 boost::optional<std::function<void(void)> > &&on_complete,
874 OpRequestRef op = OpRequestRef(),
875 int r = 0);
876 struct LogUpdateCtx {
877 boost::intrusive_ptr<RepGather> repop;
878 set<pg_shard_t> waiting_on;
879 };
880 void cancel_log_updates();
881 map<ceph_tid_t, LogUpdateCtx> log_entry_update_waiting_on;
882
883
884 // hot/cold tracking
885 HitSetRef hit_set; ///< currently accumulating HitSet
886 utime_t hit_set_start_stamp; ///< time the current HitSet started recording
887
888
889 void hit_set_clear(); ///< discard any HitSet state
890 void hit_set_setup(); ///< initialize HitSet state
891 void hit_set_create(); ///< create a new HitSet
892 void hit_set_persist(); ///< persist hit info
893 bool hit_set_apply_log(); ///< apply log entries to update in-memory HitSet
894 void hit_set_trim(OpContextUPtr &ctx, unsigned max); ///< discard old HitSets
895 void hit_set_in_memory_trim(uint32_t max_in_memory); ///< discard old in memory HitSets
896 void hit_set_remove_all();
897
898 hobject_t get_hit_set_current_object(utime_t stamp);
899 hobject_t get_hit_set_archive_object(utime_t start,
900 utime_t end,
901 bool using_gmt);
902
903 // agent
904 boost::scoped_ptr<TierAgentState> agent_state;
905
906 void agent_setup(); ///< initialize agent state
907 bool agent_work(int max) override ///< entry point to do some agent work
908 {
909 return agent_work(max, max);
910 }
911 bool agent_work(int max, int agent_flush_quota) override;
912 bool agent_maybe_flush(ObjectContextRef& obc); ///< maybe flush
913 bool agent_maybe_evict(ObjectContextRef& obc, bool after_flush); ///< maybe evict
914
915 void agent_load_hit_sets(); ///< load HitSets, if needed
916
917 /// estimate object atime and temperature
918 ///
919 /// @param oid [in] object name
920 /// @param temperature [out] relative temperature (# consider both access time and frequency)
921 void agent_estimate_temp(const hobject_t& oid, int *temperature);
922
923 /// stop the agent
924 void agent_stop() override;
925 void agent_delay() override;
926
927 /// clear agent state
928 void agent_clear() override;
929
930 /// choose (new) agent mode(s), returns true if op is requeued
931 bool agent_choose_mode(bool restart = false, OpRequestRef op = OpRequestRef());
932 void agent_choose_mode_restart() override;
933
934 /// true if we can send an ondisk/commit for v
935 bool already_complete(eversion_t v);
936 /// true if we can send an ack for v
937 bool already_ack(eversion_t v);
938
939 // projected object info
940 SharedLRU<hobject_t, ObjectContext> object_contexts;
941 // map from oid.snapdir() to SnapSetContext *
942 map<hobject_t, SnapSetContext*> snapset_contexts;
943 Mutex snapset_contexts_lock;
944
945 // debug order that client ops are applied
946 map<hobject_t, map<client_t, ceph_tid_t>> debug_op_order;
947
948 void populate_obc_watchers(ObjectContextRef obc);
949 void check_blacklisted_obc_watchers(ObjectContextRef obc);
950 void check_blacklisted_watchers() override;
951 void get_watchers(list<obj_watch_item_t> &pg_watchers) override;
952 void get_obc_watchers(ObjectContextRef obc, list<obj_watch_item_t> &pg_watchers);
953public:
954 void handle_watch_timeout(WatchRef watch);
955protected:
956
957 ObjectContextRef create_object_context(const object_info_t& oi, SnapSetContext *ssc);
958 ObjectContextRef get_object_context(
959 const hobject_t& soid,
960 bool can_create,
961 const map<string, bufferlist> *attrs = 0
962 );
963
964 void context_registry_on_change();
965 void object_context_destructor_callback(ObjectContext *obc);
966 class C_PG_ObjectContext;
967
968 int find_object_context(const hobject_t& oid,
969 ObjectContextRef *pobc,
970 bool can_create,
971 bool map_snapid_to_clone=false,
972 hobject_t *missing_oid=NULL);
973
974 void add_object_context_to_pg_stat(ObjectContextRef obc, pg_stat_t *stat);
975
976 void get_src_oloc(const object_t& oid, const object_locator_t& oloc, object_locator_t& src_oloc);
977
978 SnapSetContext *get_snapset_context(
979 const hobject_t& oid,
980 bool can_create,
981 const map<string, bufferlist> *attrs = 0,
982 bool oid_existed = true //indicate this oid whether exsited in backend
983 );
984 void register_snapset_context(SnapSetContext *ssc) {
985 Mutex::Locker l(snapset_contexts_lock);
986 _register_snapset_context(ssc);
987 }
988 void _register_snapset_context(SnapSetContext *ssc) {
989 assert(snapset_contexts_lock.is_locked());
990 if (!ssc->registered) {
991 assert(snapset_contexts.count(ssc->oid) == 0);
992 ssc->registered = true;
993 snapset_contexts[ssc->oid] = ssc;
994 }
995 }
996 void put_snapset_context(SnapSetContext *ssc);
997
998 map<hobject_t, ObjectContextRef> recovering;
999
1000 /*
1001 * Backfill
1002 *
1003 * peer_info[backfill_target].last_backfill == info.last_backfill on the peer.
1004 *
1005 * objects prior to peer_info[backfill_target].last_backfill
1006 * - are on the peer
1007 * - are included in the peer stats
1008 *
1009 * objects \in (last_backfill, last_backfill_started]
1010 * - are on the peer or are in backfills_in_flight
1011 * - are not included in pg stats (yet)
1012 * - have their stats in pending_backfill_updates on the primary
1013 */
1014 set<hobject_t> backfills_in_flight;
1015 map<hobject_t, pg_stat_t> pending_backfill_updates;
1016
1017 void dump_recovery_info(Formatter *f) const override {
1018 f->open_array_section("backfill_targets");
1019 for (set<pg_shard_t>::const_iterator p = backfill_targets.begin();
1020 p != backfill_targets.end(); ++p)
1021 f->dump_stream("replica") << *p;
1022 f->close_section();
1023 f->open_array_section("waiting_on_backfill");
1024 for (set<pg_shard_t>::const_iterator p = waiting_on_backfill.begin();
1025 p != waiting_on_backfill.end(); ++p)
1026 f->dump_stream("osd") << *p;
1027 f->close_section();
1028 f->dump_stream("last_backfill_started") << last_backfill_started;
1029 {
1030 f->open_object_section("backfill_info");
1031 backfill_info.dump(f);
1032 f->close_section();
1033 }
1034 {
1035 f->open_array_section("peer_backfill_info");
1036 for (map<pg_shard_t, BackfillInterval>::const_iterator pbi =
1037 peer_backfill_info.begin();
1038 pbi != peer_backfill_info.end(); ++pbi) {
1039 f->dump_stream("osd") << pbi->first;
1040 f->open_object_section("BackfillInterval");
1041 pbi->second.dump(f);
1042 f->close_section();
1043 }
1044 f->close_section();
1045 }
1046 {
1047 f->open_array_section("backfills_in_flight");
1048 for (set<hobject_t>::const_iterator i = backfills_in_flight.begin();
1049 i != backfills_in_flight.end();
1050 ++i) {
1051 f->dump_stream("object") << *i;
1052 }
1053 f->close_section();
1054 }
1055 {
1056 f->open_array_section("recovering");
1057 for (map<hobject_t, ObjectContextRef>::const_iterator i = recovering.begin();
1058 i != recovering.end();
1059 ++i) {
1060 f->dump_stream("object") << i->first;
1061 }
1062 f->close_section();
1063 }
1064 {
1065 f->open_object_section("pg_backend");
1066 pgbackend->dump_recovery_info(f);
1067 f->close_section();
1068 }
1069 }
1070
1071 /// last backfill operation started
1072 hobject_t last_backfill_started;
1073 bool new_backfill;
1074
1075 int prep_object_replica_pushes(const hobject_t& soid, eversion_t v,
1076 PGBackend::RecoveryHandle *h);
c07f9fc5
FG
1077 int prep_object_replica_deletes(const hobject_t& soid, eversion_t v,
1078 PGBackend::RecoveryHandle *h);
7c673cae
FG
1079
1080 void finish_degraded_object(const hobject_t& oid);
1081
1082 // Cancels/resets pulls from peer
1083 void check_recovery_sources(const OSDMapRef& map) override ;
1084
1085 int recover_missing(
1086 const hobject_t& oid,
1087 eversion_t v,
1088 int priority,
1089 PGBackend::RecoveryHandle *h);
1090
1091 // low level ops
1092
1093 void _make_clone(
1094 OpContext *ctx,
1095 PGTransaction* t,
1096 ObjectContextRef obc,
1097 const hobject_t& head, const hobject_t& coid,
1098 object_info_t *poi);
1099 void execute_ctx(OpContext *ctx);
1100 void finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc=true);
1101 void reply_ctx(OpContext *ctx, int err);
1102 void reply_ctx(OpContext *ctx, int err, eversion_t v, version_t uv);
1103 void make_writeable(OpContext *ctx);
1104 void log_op_stats(OpContext *ctx);
1105
1106 void write_update_size_and_usage(object_stat_sum_t& stats, object_info_t& oi,
1107 interval_set<uint64_t>& modified, uint64_t offset,
1108 uint64_t length, bool write_full=false);
1109 void add_interval_usage(interval_set<uint64_t>& s, object_stat_sum_t& st);
1110
1111
1112 enum class cache_result_t {
1113 NOOP,
1114 BLOCKED_FULL,
1115 BLOCKED_PROMOTE,
1116 HANDLED_PROXY,
1117 HANDLED_REDIRECT,
1118 };
1119 cache_result_t maybe_handle_cache_detail(OpRequestRef op,
1120 bool write_ordered,
1121 ObjectContextRef obc, int r,
1122 hobject_t missing_oid,
1123 bool must_promote,
1124 bool in_hit_set,
1125 ObjectContextRef *promote_obc);
31f18b77
FG
1126 cache_result_t maybe_handle_manifest_detail(OpRequestRef op,
1127 bool write_ordered,
1128 ObjectContextRef obc);
1129 bool maybe_handle_manifest(OpRequestRef op,
1130 bool write_ordered,
1131 ObjectContextRef obc) {
1132 return cache_result_t::NOOP != maybe_handle_manifest_detail(
1133 op,
1134 write_ordered,
1135 obc);
1136 }
1137
7c673cae
FG
1138 /**
1139 * This helper function is called from do_op if the ObjectContext lookup fails.
1140 * @returns true if the caching code is handling the Op, false otherwise.
1141 */
1142 bool maybe_handle_cache(OpRequestRef op,
1143 bool write_ordered,
1144 ObjectContextRef obc, int r,
1145 const hobject_t& missing_oid,
1146 bool must_promote,
1147 bool in_hit_set = false) {
1148 return cache_result_t::NOOP != maybe_handle_cache_detail(
1149 op,
1150 write_ordered,
1151 obc,
1152 r,
1153 missing_oid,
1154 must_promote,
1155 in_hit_set,
1156 nullptr);
1157 }
1158
1159 /**
1160 * This helper function checks if a promotion is needed.
1161 */
1162 bool maybe_promote(ObjectContextRef obc,
1163 const hobject_t& missing_oid,
1164 const object_locator_t& oloc,
1165 bool in_hit_set,
1166 uint32_t recency,
1167 OpRequestRef promote_op,
1168 ObjectContextRef *promote_obc = nullptr);
1169 /**
1170 * This helper function tells the client to redirect their request elsewhere.
1171 */
1172 void do_cache_redirect(OpRequestRef op);
1173 /**
1174 * This function attempts to start a promote. Either it succeeds,
1175 * or places op on a wait list. If op is null, failure means that
1176 * this is a noop. If a future user wants to be able to distinguish
1177 * these cases, a return value should be added.
1178 */
1179 void promote_object(
1180 ObjectContextRef obc, ///< [optional] obc
1181 const hobject_t& missing_object, ///< oid (if !obc)
1182 const object_locator_t& oloc, ///< locator for obc|oid
1183 OpRequestRef op, ///< [optional] client op
1184 ObjectContextRef *promote_obc = nullptr ///< [optional] new obc for object
1185 );
1186
1187 int prepare_transaction(OpContext *ctx);
1188 list<pair<OpRequestRef, OpContext*> > in_progress_async_reads;
1189 void complete_read_ctx(int result, OpContext *ctx);
1190
1191 // pg on-disk content
1192 void check_local() override;
1193
1194 void _clear_recovery_state() override;
1195
1196 bool start_recovery_ops(
1197 uint64_t max,
1198 ThreadPool::TPHandle &handle, uint64_t *started) override;
1199
1200 uint64_t recover_primary(uint64_t max, ThreadPool::TPHandle &handle);
1201 uint64_t recover_replicas(uint64_t max, ThreadPool::TPHandle &handle);
1202 hobject_t earliest_peer_backfill() const;
1203 bool all_peer_done() const;
1204 /**
1205 * @param work_started will be set to true if recover_backfill got anywhere
1206 * @returns the number of operations started
1207 */
1208 uint64_t recover_backfill(uint64_t max, ThreadPool::TPHandle &handle,
1209 bool *work_started);
1210
1211 /**
1212 * scan a (hash) range of objects in the current pg
1213 *
1214 * @begin first item should be >= this value
1215 * @min return at least this many items, unless we are done
1216 * @max return no more than this many items
1217 * @bi [out] resulting map of objects to eversion_t's
1218 */
1219 void scan_range(
1220 int min, int max, BackfillInterval *bi,
1221 ThreadPool::TPHandle &handle
1222 );
1223
1224 /// Update a hash range to reflect changes since the last scan
1225 void update_range(
1226 BackfillInterval *bi, ///< [in,out] interval to update
1227 ThreadPool::TPHandle &handle ///< [in] tp handle
1228 );
1229
224ce89b 1230 int prep_backfill_object_push(
7c673cae
FG
1231 hobject_t oid, eversion_t v, ObjectContextRef obc,
1232 vector<pg_shard_t> peers,
1233 PGBackend::RecoveryHandle *h);
1234 void send_remove_op(const hobject_t& oid, eversion_t v, pg_shard_t peer);
1235
1236
1237 class C_OSD_OndiskWriteUnlock;
1238 class C_OSD_AppliedRecoveredObject;
1239 class C_OSD_CommittedPushedObject;
1240 class C_OSD_AppliedRecoveredObjectReplica;
1241 void sub_op_remove(OpRequestRef op);
1242
1243 void _applied_recovered_object(ObjectContextRef obc);
1244 void _applied_recovered_object_replica();
1245 void _committed_pushed_object(epoch_t epoch, eversion_t lc);
1246 void recover_got(hobject_t oid, eversion_t v);
1247
1248 // -- copyfrom --
1249 map<hobject_t, CopyOpRef> copy_ops;
1250
c07f9fc5
FG
1251 int do_copy_get(OpContext *ctx, bufferlist::iterator& bp, OSDOp& op,
1252 ObjectContextRef& obc);
1253 int finish_copy_get();
1254
7c673cae
FG
1255 void fill_in_copy_get_noent(OpRequestRef& op, hobject_t oid,
1256 OSDOp& osd_op);
1257
1258 /**
1259 * To copy an object, call start_copy.
1260 *
1261 * @param cb: The CopyCallback to be activated when the copy is complete
1262 * @param obc: The ObjectContext we are copying into
1263 * @param src: The source object
1264 * @param oloc: the source object locator
1265 * @param version: the version of the source object to copy (0 for any)
1266 */
1267 void start_copy(CopyCallback *cb, ObjectContextRef obc, hobject_t src,
1268 object_locator_t oloc, version_t version, unsigned flags,
1269 bool mirror_snapset, unsigned src_obj_fadvise_flags,
1270 unsigned dest_obj_fadvise_flags);
1271 void process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r);
1272 void _write_copy_chunk(CopyOpRef cop, PGTransaction *t);
1273 uint64_t get_copy_chunk_size() const {
1274 uint64_t size = cct->_conf->osd_copyfrom_max_chunk;
1275 if (pool.info.requires_aligned_append()) {
1276 uint64_t alignment = pool.info.required_alignment();
1277 if (size % alignment) {
1278 size += alignment - (size % alignment);
1279 }
1280 }
1281 return size;
1282 }
1283 void _copy_some(ObjectContextRef obc, CopyOpRef cop);
c07f9fc5 1284 void finish_copyfrom(CopyFromCallback *cb);
7c673cae
FG
1285 void finish_promote(int r, CopyResults *results, ObjectContextRef obc);
1286 void cancel_copy(CopyOpRef cop, bool requeue);
1287 void cancel_copy_ops(bool requeue);
1288
1289 friend struct C_Copyfrom;
1290
1291 // -- flush --
1292 map<hobject_t, FlushOpRef> flush_ops;
1293
1294 /// start_flush takes ownership of on_flush iff ret == -EINPROGRESS
1295 int start_flush(
1296 OpRequestRef op, ObjectContextRef obc,
1297 bool blocking, hobject_t *pmissing,
1298 boost::optional<std::function<void()>> &&on_flush);
1299 void finish_flush(hobject_t oid, ceph_tid_t tid, int r);
1300 int try_flush_mark_clean(FlushOpRef fop);
1301 void cancel_flush(FlushOpRef fop, bool requeue);
1302 void cancel_flush_ops(bool requeue);
1303
1304 /// @return false if clone is has been evicted
1305 bool is_present_clone(hobject_t coid);
1306
1307 friend struct C_Flush;
1308
1309 // -- scrub --
1310 bool _range_available_for_scrub(
1311 const hobject_t &begin, const hobject_t &end) override;
1312 void scrub_snapshot_metadata(
1313 ScrubMap &map,
1314 const std::map<hobject_t, pair<uint32_t, uint32_t>> &missing_digest) override;
1315 void _scrub_clear_state() override;
1316 void _scrub_finish() override;
1317 object_stat_collection_t scrub_cstat;
1318
1319 void _split_into(pg_t child_pgid, PG *child,
1320 unsigned split_bits) override;
1321 void apply_and_flush_repops(bool requeue);
1322
1323 void calc_trim_to() override;
1324 int do_xattr_cmp_u64(int op, __u64 v1, bufferlist& xattr);
1325 int do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr);
1326
1327 // -- checksum --
c07f9fc5 1328 int do_checksum(OpContext *ctx, OSDOp& osd_op, bufferlist::iterator *bl_it);
7c673cae
FG
1329 int finish_checksum(OSDOp& osd_op, Checksummer::CSumType csum_type,
1330 bufferlist::iterator *init_value_bl_it,
1331 const bufferlist &read_bl);
1332
1333 friend class C_ChecksumRead;
1334
1335 int do_extent_cmp(OpContext *ctx, OSDOp& osd_op);
c07f9fc5
FG
1336 int finish_extent_cmp(OSDOp& osd_op, const bufferlist &read_bl);
1337
1338 friend class C_ExtentCmpRead;
1339
1340 int do_read(OpContext *ctx, OSDOp& osd_op);
1341 int do_sparse_read(OpContext *ctx, OSDOp& osd_op);
7c673cae
FG
1342 int do_writesame(OpContext *ctx, OSDOp& osd_op);
1343
1344 bool pgls_filter(PGLSFilter *filter, hobject_t& sobj, bufferlist& outdata);
1345 int get_pgls_filter(bufferlist::iterator& iter, PGLSFilter **pfilter);
1346
1347 map<hobject_t, list<OpRequestRef>> in_progress_proxy_ops;
1348 void kick_proxy_ops_blocked(hobject_t& soid);
1349 void cancel_proxy_ops(bool requeue);
1350
1351 // -- proxyread --
1352 map<ceph_tid_t, ProxyReadOpRef> proxyread_ops;
1353
31f18b77 1354 void do_proxy_read(OpRequestRef op, ObjectContextRef obc = NULL);
7c673cae
FG
1355 void finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r);
1356 void cancel_proxy_read(ProxyReadOpRef prdop);
1357
1358 friend struct C_ProxyRead;
1359
1360 // -- proxywrite --
1361 map<ceph_tid_t, ProxyWriteOpRef> proxywrite_ops;
1362
31f18b77 1363 void do_proxy_write(OpRequestRef op, const hobject_t& missing_oid, ObjectContextRef obc = NULL);
7c673cae
FG
1364 void finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r);
1365 void cancel_proxy_write(ProxyWriteOpRef pwop);
1366
1367 friend struct C_ProxyWrite_Commit;
1368
1369public:
1370 PrimaryLogPG(OSDService *o, OSDMapRef curmap,
1371 const PGPool &_pool, spg_t p);
1372 ~PrimaryLogPG() override {}
1373
1374 int do_command(
1375 cmdmap_t cmdmap,
1376 ostream& ss,
1377 bufferlist& idata,
1378 bufferlist& odata,
1379 ConnectionRef conn,
1380 ceph_tid_t tid) override;
1381
1382 void do_request(
1383 OpRequestRef& op,
1384 ThreadPool::TPHandle &handle) override;
1385 void do_op(OpRequestRef& op) override;
1386 void record_write_error(OpRequestRef op, const hobject_t &soid,
1387 MOSDOpReply *orig_reply, int r);
1388 void do_pg_op(OpRequestRef op);
1389 void do_sub_op(OpRequestRef op) override;
1390 void do_sub_op_reply(OpRequestRef op) override;
1391 void do_scan(
1392 OpRequestRef op,
1393 ThreadPool::TPHandle &handle) override;
1394 void do_backfill(OpRequestRef op) override;
1395 void do_backfill_remove(OpRequestRef op);
1396
1397 void handle_backoff(OpRequestRef& op);
1398
224ce89b 1399 int trim_object(bool first, const hobject_t &coid, OpContextUPtr *ctxp);
7c673cae
FG
1400 void snap_trimmer(epoch_t e) override;
1401 void kick_snap_trim() override;
1402 void snap_trimmer_scrub_complete() override;
1403 int do_osd_ops(OpContext *ctx, vector<OSDOp>& ops);
1404
1405 int _get_tmap(OpContext *ctx, bufferlist *header, bufferlist *vals);
1406 int do_tmap2omap(OpContext *ctx, unsigned flags);
1407 int do_tmapup(OpContext *ctx, bufferlist::iterator& bp, OSDOp& osd_op);
1408 int do_tmapup_slow(OpContext *ctx, bufferlist::iterator& bp, OSDOp& osd_op, bufferlist& bl);
1409
1410 void do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn);
1411private:
1412 int do_scrub_ls(MOSDOp *op, OSDOp *osd_op);
1413 hobject_t earliest_backfill() const;
1414 bool check_src_targ(const hobject_t& soid, const hobject_t& toid) const;
1415
1416 uint64_t temp_seq; ///< last id for naming temp objects
1417 /// generate a new temp object name
1418 hobject_t generate_temp_object(const hobject_t& target);
1419 /// generate a new temp object name (for recovery)
1420 hobject_t get_temp_recovery_object(const hobject_t& target,
1421 eversion_t version) override;
1422 int get_recovery_op_priority() const {
1423 int pri = 0;
1424 pool.info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
1425 return pri > 0 ? pri : cct->_conf->osd_recovery_op_priority;
1426 }
1427 void log_missing(unsigned missing,
1428 const boost::optional<hobject_t> &head,
1429 LogChannelRef clog,
1430 const spg_t &pgid,
1431 const char *func,
1432 const char *mode,
1433 bool allow_incomplete_clones);
1434 unsigned process_clones_to(const boost::optional<hobject_t> &head,
1435 const boost::optional<SnapSet> &snapset,
1436 LogChannelRef clog,
1437 const spg_t &pgid,
1438 const char *mode,
1439 bool allow_incomplete_clones,
1440 boost::optional<snapid_t> target,
1441 vector<snapid_t>::reverse_iterator *curclone,
1442 inconsistent_snapset_wrapper &snap_error);
1443
1444public:
1445 coll_t get_coll() {
1446 return coll;
1447 }
1448 void split_colls(
1449 spg_t child,
1450 int split_bits,
1451 int seed,
1452 const pg_pool_t *pool,
1453 ObjectStore::Transaction *t) override {
1454 coll_t target = coll_t(child);
1455 PG::_create(*t, child, split_bits);
1456 t->split_collection(
1457 coll,
1458 split_bits,
1459 seed,
1460 target);
1461 PG::_init(*t, child, pool);
1462 }
1463private:
1464
1465 struct DoSnapWork : boost::statechart::event< DoSnapWork > {
1466 DoSnapWork() : boost::statechart::event < DoSnapWork >() {}
1467 };
1468 struct KickTrim : boost::statechart::event< KickTrim > {
1469 KickTrim() : boost::statechart::event < KickTrim >() {}
1470 };
1471 struct RepopsComplete : boost::statechart::event< RepopsComplete > {
1472 RepopsComplete() : boost::statechart::event < RepopsComplete >() {}
1473 };
1474 struct ScrubComplete : boost::statechart::event< ScrubComplete > {
1475 ScrubComplete() : boost::statechart::event < ScrubComplete >() {}
1476 };
1477 struct TrimWriteUnblocked : boost::statechart::event< TrimWriteUnblocked > {
1478 TrimWriteUnblocked() : boost::statechart::event < TrimWriteUnblocked >() {}
1479 };
1480 struct Reset : boost::statechart::event< Reset > {
1481 Reset() : boost::statechart::event< Reset >() {}
1482 };
1483 struct SnapTrimReserved : boost::statechart::event< SnapTrimReserved > {
1484 SnapTrimReserved() : boost::statechart::event< SnapTrimReserved >() {}
1485 };
1486 struct SnapTrimTimerReady : boost::statechart::event< SnapTrimTimerReady > {
1487 SnapTrimTimerReady() : boost::statechart::event< SnapTrimTimerReady >() {}
1488 };
1489
1490 struct NotTrimming;
1491 struct SnapTrimmer : public boost::statechart::state_machine< SnapTrimmer, NotTrimming > {
1492 PrimaryLogPG *pg;
1493 explicit SnapTrimmer(PrimaryLogPG *pg) : pg(pg) {}
1494 void log_enter(const char *state_name);
1495 void log_exit(const char *state_name, utime_t duration);
1496 bool can_trim() {
1497 return pg->is_clean() && !pg->scrubber.active && !pg->snap_trimq.empty();
1498 }
1499 } snap_trimmer_machine;
1500
1501 struct WaitReservation;
1502 struct Trimming : boost::statechart::state< Trimming, SnapTrimmer, WaitReservation >, NamedState {
1503 typedef boost::mpl::list <
1504 boost::statechart::custom_reaction< KickTrim >,
1505 boost::statechart::transition< Reset, NotTrimming >
1506 > reactions;
1507
1508 set<hobject_t> in_flight;
1509 snapid_t snap_to_trim;
1510
1511 explicit Trimming(my_context ctx)
1512 : my_base(ctx),
1513 NamedState(context< SnapTrimmer >().pg, "Trimming") {
1514 context< SnapTrimmer >().log_enter(state_name);
1515 assert(context< SnapTrimmer >().can_trim());
1516 assert(in_flight.empty());
1517 }
1518 void exit() {
1519 context< SnapTrimmer >().log_exit(state_name, enter_time);
1520 auto *pg = context< SnapTrimmer >().pg;
1521 pg->osd->snap_reserver.cancel_reservation(pg->get_pgid());
1522 pg->state_clear(PG_STATE_SNAPTRIM);
1523 pg->publish_stats_to_osd();
1524 }
1525 boost::statechart::result react(const KickTrim&) {
1526 return discard_event();
1527 }
1528 };
1529
1530 /* SnapTrimmerStates */
1531 struct WaitTrimTimer : boost::statechart::state< WaitTrimTimer, Trimming >, NamedState {
1532 typedef boost::mpl::list <
1533 boost::statechart::custom_reaction< SnapTrimTimerReady >
1534 > reactions;
1535 Context *wakeup = nullptr;
1536 explicit WaitTrimTimer(my_context ctx)
1537 : my_base(ctx),
1538 NamedState(context< SnapTrimmer >().pg, "Trimming/WaitTrimTimer") {
1539 context< SnapTrimmer >().log_enter(state_name);
1540 assert(context<Trimming>().in_flight.empty());
1541 struct OnTimer : Context {
1542 PrimaryLogPGRef pg;
1543 epoch_t epoch;
1544 OnTimer(PrimaryLogPGRef pg, epoch_t epoch) : pg(pg), epoch(epoch) {}
1545 void finish(int) override {
1546 pg->lock();
1547 if (!pg->pg_has_reset_since(epoch))
1548 pg->snap_trimmer_machine.process_event(SnapTrimTimerReady());
1549 pg->unlock();
1550 }
1551 };
1552 auto *pg = context< SnapTrimmer >().pg;
1553 if (pg->cct->_conf->osd_snap_trim_sleep > 0) {
1554 wakeup = new OnTimer{pg, pg->get_osdmap()->get_epoch()};
1555 Mutex::Locker l(pg->osd->snap_sleep_lock);
1556 pg->osd->snap_sleep_timer.add_event_after(
1557 pg->cct->_conf->osd_snap_trim_sleep, wakeup);
1558 } else {
1559 post_event(SnapTrimTimerReady());
1560 }
1561 }
1562 void exit() {
1563 context< SnapTrimmer >().log_exit(state_name, enter_time);
1564 auto *pg = context< SnapTrimmer >().pg;
1565 if (wakeup) {
1566 Mutex::Locker l(pg->osd->snap_sleep_lock);
1567 pg->osd->snap_sleep_timer.cancel_event(wakeup);
1568 wakeup = nullptr;
1569 }
1570 }
1571 boost::statechart::result react(const SnapTrimTimerReady &) {
1572 wakeup = nullptr;
1573 if (!context< SnapTrimmer >().can_trim()) {
1574 post_event(KickTrim());
1575 return transit< NotTrimming >();
1576 } else {
1577 return transit< AwaitAsyncWork >();
1578 }
1579 }
1580 };
1581
1582 struct WaitRWLock : boost::statechart::state< WaitRWLock, Trimming >, NamedState {
1583 typedef boost::mpl::list <
1584 boost::statechart::custom_reaction< TrimWriteUnblocked >
1585 > reactions;
1586 explicit WaitRWLock(my_context ctx)
1587 : my_base(ctx),
1588 NamedState(context< SnapTrimmer >().pg, "Trimming/WaitRWLock") {
1589 context< SnapTrimmer >().log_enter(state_name);
1590 assert(context<Trimming>().in_flight.empty());
1591 }
1592 void exit() {
1593 context< SnapTrimmer >().log_exit(state_name, enter_time);
1594 }
1595 boost::statechart::result react(const TrimWriteUnblocked&) {
1596 if (!context< SnapTrimmer >().can_trim()) {
1597 post_event(KickTrim());
1598 return transit< NotTrimming >();
1599 } else {
1600 return transit< AwaitAsyncWork >();
1601 }
1602 }
1603 };
1604
1605 struct WaitRepops : boost::statechart::state< WaitRepops, Trimming >, NamedState {
1606 typedef boost::mpl::list <
1607 boost::statechart::custom_reaction< RepopsComplete >
1608 > reactions;
1609 explicit WaitRepops(my_context ctx)
1610 : my_base(ctx),
1611 NamedState(context< SnapTrimmer >().pg, "Trimming/WaitRepops") {
1612 context< SnapTrimmer >().log_enter(state_name);
1613 assert(!context<Trimming>().in_flight.empty());
1614 }
1615 void exit() {
1616 context< SnapTrimmer >().log_exit(state_name, enter_time);
1617 }
1618 boost::statechart::result react(const RepopsComplete&) {
1619 if (!context< SnapTrimmer >().can_trim()) {
1620 post_event(KickTrim());
1621 return transit< NotTrimming >();
1622 } else {
1623 return transit< WaitTrimTimer >();
1624 }
1625 }
1626 };
1627
1628 struct AwaitAsyncWork : boost::statechart::state< AwaitAsyncWork, Trimming >, NamedState {
1629 typedef boost::mpl::list <
1630 boost::statechart::custom_reaction< DoSnapWork >
1631 > reactions;
1632 explicit AwaitAsyncWork(my_context ctx);
1633 void exit() {
1634 context< SnapTrimmer >().log_exit(state_name, enter_time);
1635 }
1636 boost::statechart::result react(const DoSnapWork&);
1637 };
1638
1639 struct WaitReservation : boost::statechart::state< WaitReservation, Trimming >, NamedState {
1640 /* WaitReservation is a sub-state of trimming simply so that exiting Trimming
1641 * always cancels the reservation */
1642 typedef boost::mpl::list <
1643 boost::statechart::custom_reaction< SnapTrimReserved >
1644 > reactions;
1645 struct ReservationCB : public Context {
1646 PrimaryLogPGRef pg;
1647 bool canceled;
1648 ReservationCB(PrimaryLogPG *pg) : pg(pg), canceled(false) {}
1649 void finish(int) override {
1650 pg->lock();
1651 if (!canceled)
1652 pg->snap_trimmer_machine.process_event(SnapTrimReserved());
1653 pg->unlock();
1654 }
1655 void cancel() {
1656 assert(pg->is_locked());
1657 assert(!canceled);
1658 canceled = true;
1659 }
1660 };
1661 ReservationCB *pending = nullptr;
1662
1663 explicit WaitReservation(my_context ctx)
1664 : my_base(ctx),
1665 NamedState(context< SnapTrimmer >().pg, "Trimming/WaitReservation") {
1666 context< SnapTrimmer >().log_enter(state_name);
1667 assert(context<Trimming>().in_flight.empty());
1668 auto *pg = context< SnapTrimmer >().pg;
1669 pending = new ReservationCB(pg);
1670 pg->osd->snap_reserver.request_reservation(
1671 pg->get_pgid(),
1672 pending,
1673 0);
1674 pg->state_set(PG_STATE_SNAPTRIM_WAIT);
1675 pg->publish_stats_to_osd();
1676 }
1677 boost::statechart::result react(const SnapTrimReserved&);
1678 void exit() {
1679 context< SnapTrimmer >().log_exit(state_name, enter_time);
1680 if (pending)
1681 pending->cancel();
1682 pending = nullptr;
1683 auto *pg = context< SnapTrimmer >().pg;
1684 pg->state_clear(PG_STATE_SNAPTRIM_WAIT);
224ce89b 1685 pg->state_clear(PG_STATE_SNAPTRIM_ERROR);
7c673cae
FG
1686 pg->publish_stats_to_osd();
1687 }
1688 };
1689
1690 struct WaitScrub : boost::statechart::state< WaitScrub, SnapTrimmer >, NamedState {
1691 typedef boost::mpl::list <
1692 boost::statechart::custom_reaction< ScrubComplete >,
1693 boost::statechart::custom_reaction< KickTrim >,
1694 boost::statechart::transition< Reset, NotTrimming >
1695 > reactions;
1696 explicit WaitScrub(my_context ctx)
1697 : my_base(ctx),
1698 NamedState(context< SnapTrimmer >().pg, "Trimming/WaitScrub") {
1699 context< SnapTrimmer >().log_enter(state_name);
1700 }
1701 void exit() {
1702 context< SnapTrimmer >().log_exit(state_name, enter_time);
1703 }
1704 boost::statechart::result react(const ScrubComplete&) {
1705 post_event(KickTrim());
1706 return transit< NotTrimming >();
1707 }
1708 boost::statechart::result react(const KickTrim&) {
1709 return discard_event();
1710 }
1711 };
1712
1713 struct NotTrimming : boost::statechart::state< NotTrimming, SnapTrimmer >, NamedState {
1714 typedef boost::mpl::list <
1715 boost::statechart::custom_reaction< KickTrim >,
1716 boost::statechart::transition< Reset, NotTrimming >
1717 > reactions;
1718 explicit NotTrimming(my_context ctx);
1719 void exit();
1720 boost::statechart::result react(const KickTrim&);
1721 };
1722
1723 int _verify_no_head_clones(const hobject_t& soid,
1724 const SnapSet& ss);
1725 // return true if we're creating a local object, false for a
1726 // whiteout or no change.
1727 void maybe_create_new_object(OpContext *ctx, bool ignore_transaction=false);
1728 int _delete_oid(OpContext *ctx, bool no_whiteout, bool try_no_whiteout);
1729 int _rollback_to(OpContext *ctx, ceph_osd_op& op);
1730public:
1731 bool is_missing_object(const hobject_t& oid) const;
1732 bool is_unreadable_object(const hobject_t &oid) const {
1733 return is_missing_object(oid) ||
1734 !missing_loc.readable_with_acting(oid, actingset);
1735 }
1736 void maybe_kick_recovery(const hobject_t &soid);
1737 void wait_for_unreadable_object(const hobject_t& oid, OpRequestRef op);
1738 void wait_for_all_missing(OpRequestRef op);
1739
1740 bool is_degraded_or_backfilling_object(const hobject_t& oid);
1741 void wait_for_degraded_object(const hobject_t& oid, OpRequestRef op);
1742
1743 void block_write_on_full_cache(
1744 const hobject_t& oid, OpRequestRef op);
224ce89b
WB
1745 void block_for_clean(
1746 const hobject_t& oid, OpRequestRef op);
7c673cae
FG
1747 void block_write_on_snap_rollback(
1748 const hobject_t& oid, ObjectContextRef obc, OpRequestRef op);
1749 void block_write_on_degraded_snap(const hobject_t& oid, OpRequestRef op);
1750
1751 bool maybe_await_blocked_snapset(const hobject_t &soid, OpRequestRef op);
1752 void wait_for_blocked_object(const hobject_t& soid, OpRequestRef op);
1753 void kick_object_context_blocked(ObjectContextRef obc);
1754
1755 void maybe_force_recovery();
1756
1757 void mark_all_unfound_lost(
1758 int what,
1759 ConnectionRef con,
1760 ceph_tid_t tid);
1761 eversion_t pick_newest_available(const hobject_t& oid);
1762
1763 void do_update_log_missing(
1764 OpRequestRef &op);
1765
1766 void do_update_log_missing_reply(
1767 OpRequestRef &op);
1768
1769 void on_role_change() override;
1770 void on_pool_change() override;
1771 void _on_new_interval() override;
c07f9fc5 1772 void clear_async_reads();
7c673cae
FG
1773 void on_change(ObjectStore::Transaction *t) override;
1774 void on_activate() override;
1775 void on_flushed() override;
1776 void on_removal(ObjectStore::Transaction *t) override;
1777 void on_shutdown() override;
1778 bool check_failsafe_full(ostream &ss) override;
1779 bool check_osdmap_full(const set<pg_shard_t> &missing_on) override;
224ce89b 1780 int rep_repair_primary_object(const hobject_t& soid, OpRequestRef op);
7c673cae
FG
1781
1782 // attr cache handling
1783 void setattr_maybe_cache(
1784 ObjectContextRef obc,
1785 OpContext *op,
1786 PGTransaction *t,
1787 const string &key,
1788 bufferlist &val);
1789 void setattrs_maybe_cache(
1790 ObjectContextRef obc,
1791 OpContext *op,
1792 PGTransaction *t,
1793 map<string, bufferlist> &attrs);
1794 void rmattr_maybe_cache(
1795 ObjectContextRef obc,
1796 OpContext *op,
1797 PGTransaction *t,
1798 const string &key);
1799 int getattr_maybe_cache(
1800 ObjectContextRef obc,
1801 const string &key,
1802 bufferlist *val);
1803 int getattrs_maybe_cache(
1804 ObjectContextRef obc,
1805 map<string, bufferlist> *out,
1806 bool user_only = false);
1807};
1808
1809inline ostream& operator<<(ostream& out, const PrimaryLogPG::RepGather& repop)
1810{
1811 out << "repgather(" << &repop
1812 << " " << repop.v
1813 << " rep_tid=" << repop.rep_tid
1814 << " committed?=" << repop.all_committed
1815 << " applied?=" << repop.all_applied
1816 << " r=" << repop.r
1817 << ")";
1818 return out;
1819}
1820
1821inline ostream& operator<<(ostream& out,
1822 const PrimaryLogPG::ProxyWriteOpRef& pwop)
1823{
1824 out << "proxywrite(" << &pwop
1825 << " " << pwop->user_version
1826 << " pwop_tid=" << pwop->objecter_tid;
1827 if (pwop->ctx->op)
1828 out << " op=" << *(pwop->ctx->op->get_req());
1829 out << ")";
1830 return out;
1831}
1832
1833void intrusive_ptr_add_ref(PrimaryLogPG::RepGather *repop);
1834void intrusive_ptr_release(PrimaryLogPG::RepGather *repop);
1835
1836
1837#endif