]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #pragma once | |
5 | ||
6 | #include <seastar/core/future.hh> | |
7 | ||
8 | #include "crimson/common/type_helpers.h" | |
9 | #include "crimson/os/futurized_store.h" | |
10 | #include "crimson/os/futurized_collection.h" | |
11 | #include "crimson/osd/object_context.h" | |
12 | #include "crimson/osd/shard_services.h" | |
13 | ||
14 | #include "messages/MOSDPGBackfill.h" | |
15 | #include "messages/MOSDPGBackfillRemove.h" | |
16 | #include "messages/MOSDPGScan.h" | |
17 | #include "osd/recovery_types.h" | |
18 | #include "osd/osd_types.h" | |
19 | ||
20 | namespace crimson::osd{ | |
21 | class PG; | |
22 | } | |
23 | ||
24 | class PGBackend; | |
25 | ||
26 | class RecoveryBackend { | |
27 | void handle_backfill_finish( | |
28 | MOSDPGBackfill& m); | |
29 | seastar::future<> handle_backfill_progress( | |
30 | MOSDPGBackfill& m); | |
31 | seastar::future<> handle_backfill_finish_ack( | |
32 | MOSDPGBackfill& m); | |
33 | seastar::future<> handle_backfill(MOSDPGBackfill& m); | |
34 | ||
35 | seastar::future<> handle_backfill_remove(MOSDPGBackfillRemove& m); | |
36 | ||
37 | seastar::future<> handle_scan_get_digest( | |
38 | MOSDPGScan& m); | |
39 | seastar::future<> handle_scan_digest( | |
40 | MOSDPGScan& m); | |
41 | seastar::future<> handle_scan( | |
42 | MOSDPGScan& m); | |
43 | protected: | |
44 | class WaitForObjectRecovery; | |
45 | public: | |
46 | RecoveryBackend(crimson::osd::PG& pg, | |
47 | crimson::osd::ShardServices& shard_services, | |
48 | crimson::os::CollectionRef coll, | |
49 | PGBackend* backend) | |
50 | : pg{pg}, | |
51 | shard_services{shard_services}, | |
52 | store{&shard_services.get_store()}, | |
53 | coll{coll}, | |
54 | backend{backend} {} | |
55 | virtual ~RecoveryBackend() {} | |
56 | WaitForObjectRecovery& add_recovering(const hobject_t& soid) { | |
57 | auto [it, added] = recovering.emplace(soid, WaitForObjectRecovery{}); | |
58 | assert(added); | |
59 | return it->second; | |
60 | } | |
61 | WaitForObjectRecovery& get_recovering(const hobject_t& soid) { | |
62 | assert(is_recovering(soid)); | |
63 | return recovering.at(soid); | |
64 | } | |
65 | void remove_recovering(const hobject_t& soid) { | |
66 | recovering.erase(soid); | |
67 | } | |
68 | bool is_recovering(const hobject_t& soid) const { | |
69 | return recovering.count(soid) != 0; | |
70 | } | |
71 | uint64_t total_recovering() const { | |
72 | return recovering.size(); | |
73 | } | |
74 | ||
75 | virtual seastar::future<> handle_recovery_op( | |
76 | Ref<MOSDFastDispatchOp> m); | |
77 | ||
78 | virtual seastar::future<> recover_object( | |
79 | const hobject_t& soid, | |
80 | eversion_t need) = 0; | |
81 | virtual seastar::future<> recover_delete( | |
82 | const hobject_t& soid, | |
83 | eversion_t need) = 0; | |
84 | virtual seastar::future<> push_delete( | |
85 | const hobject_t& soid, | |
86 | eversion_t need) = 0; | |
87 | ||
88 | seastar::future<BackfillInterval> scan_for_backfill( | |
89 | const hobject_t& from, | |
90 | std::int64_t min, | |
91 | std::int64_t max); | |
92 | ||
93 | void on_peering_interval_change(ceph::os::Transaction& t) { | |
94 | clean_up(t, "new peering interval"); | |
95 | } | |
96 | ||
97 | seastar::future<> stop() { | |
98 | for (auto& [soid, recovery_waiter] : recovering) { | |
99 | recovery_waiter.stop(); | |
100 | } | |
101 | return on_stop(); | |
102 | } | |
103 | protected: | |
104 | crimson::osd::PG& pg; | |
105 | crimson::osd::ShardServices& shard_services; | |
106 | crimson::os::FuturizedStore* store; | |
107 | crimson::os::CollectionRef coll; | |
108 | PGBackend* backend; | |
109 | ||
110 | struct PullInfo { | |
111 | pg_shard_t from; | |
112 | hobject_t soid; | |
113 | ObjectRecoveryProgress recovery_progress; | |
114 | ObjectRecoveryInfo recovery_info; | |
115 | crimson::osd::ObjectContextRef head_ctx; | |
116 | crimson::osd::ObjectContextRef obc; | |
117 | object_stat_sum_t stat; | |
118 | bool is_complete() const { | |
119 | return recovery_progress.is_complete(recovery_info); | |
120 | } | |
121 | }; | |
122 | ||
123 | struct PushInfo { | |
124 | ObjectRecoveryProgress recovery_progress; | |
125 | ObjectRecoveryInfo recovery_info; | |
126 | crimson::osd::ObjectContextRef obc; | |
127 | object_stat_sum_t stat; | |
128 | }; | |
129 | ||
130 | class WaitForObjectRecovery : public crimson::osd::BlockerT<WaitForObjectRecovery> { | |
131 | seastar::shared_promise<> readable, recovered, pulled; | |
132 | std::map<pg_shard_t, seastar::shared_promise<>> pushes; | |
133 | public: | |
134 | static constexpr const char* type_name = "WaitForObjectRecovery"; | |
135 | ||
136 | crimson::osd::ObjectContextRef obc; | |
137 | std::optional<PullInfo> pi; | |
138 | std::map<pg_shard_t, PushInfo> pushing; | |
139 | ||
140 | seastar::future<> wait_for_readable() { | |
141 | return readable.get_shared_future(); | |
142 | } | |
143 | seastar::future<> wait_for_pushes(pg_shard_t shard) { | |
144 | return pushes[shard].get_shared_future(); | |
145 | } | |
146 | seastar::future<> wait_for_recovered() { | |
147 | return recovered.get_shared_future(); | |
148 | } | |
149 | crimson::osd::blocking_future<> | |
150 | wait_for_recovered_blocking() { | |
151 | return make_blocking_future( | |
152 | recovered.get_shared_future()); | |
153 | } | |
154 | seastar::future<> wait_for_pull() { | |
155 | return pulled.get_shared_future(); | |
156 | } | |
157 | void set_readable() { | |
158 | readable.set_value(); | |
159 | } | |
160 | void set_recovered() { | |
161 | recovered.set_value(); | |
162 | } | |
163 | void set_pushed(pg_shard_t shard) { | |
164 | pushes[shard].set_value(); | |
165 | } | |
166 | void set_pulled() { | |
167 | pulled.set_value(); | |
168 | } | |
169 | void set_push_failed(pg_shard_t shard, std::exception_ptr e) { | |
170 | pushes.at(shard).set_exception(e); | |
171 | } | |
172 | void interrupt(std::string_view why) { | |
173 | readable.set_exception(std::system_error( | |
174 | std::make_error_code(std::errc::interrupted), why.data())); | |
175 | recovered.set_exception(std::system_error( | |
176 | std::make_error_code(std::errc::interrupted), why.data())); | |
177 | pulled.set_exception(std::system_error( | |
178 | std::make_error_code(std::errc::interrupted), why.data())); | |
179 | for (auto& [pg_shard, pr] : pushes) { | |
180 | pr.set_exception(std::system_error( | |
181 | std::make_error_code(std::errc::interrupted), why.data())); | |
182 | } | |
183 | } | |
184 | void stop(); | |
185 | void dump_detail(Formatter* f) const { | |
186 | } | |
187 | }; | |
188 | std::map<hobject_t, WaitForObjectRecovery> recovering; | |
189 | hobject_t get_temp_recovery_object( | |
190 | const hobject_t& target, | |
191 | eversion_t version) const; | |
192 | ||
193 | boost::container::flat_set<hobject_t> temp_contents; | |
194 | ||
195 | void add_temp_obj(const hobject_t &oid) { | |
196 | temp_contents.insert(oid); | |
197 | } | |
198 | void clear_temp_obj(const hobject_t &oid) { | |
199 | temp_contents.erase(oid); | |
200 | } | |
201 | void clean_up(ceph::os::Transaction& t, std::string_view why); | |
202 | virtual seastar::future<> on_stop() = 0; | |
203 | }; |