]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "journal/JournalTrimmer.h" | |
5 | #include "journal/Utils.h" | |
6 | #include "common/Cond.h" | |
7 | #include "common/errno.h" | |
8 | #include <limits> | |
9 | ||
10 | #define dout_subsys ceph_subsys_journaler | |
11 | #undef dout_prefix | |
12 | #define dout_prefix *_dout << "JournalTrimmer: " << this << " " | |
13 | ||
14 | namespace journal { | |
15 | ||
16 | struct JournalTrimmer::C_RemoveSet : public Context { | |
17 | JournalTrimmer *journal_trimmer; | |
18 | uint64_t object_set; | |
9f95a23c | 19 | ceph::mutex lock = ceph::make_mutex("JournalTrimmer::m_lock"); |
7c673cae FG |
20 | uint32_t refs; |
21 | int return_value; | |
22 | ||
23 | C_RemoveSet(JournalTrimmer *_journal_trimmer, uint64_t _object_set, | |
24 | uint8_t _splay_width); | |
25 | void complete(int r) override; | |
26 | void finish(int r) override { | |
27 | journal_trimmer->handle_set_removed(r, object_set); | |
28 | journal_trimmer->m_async_op_tracker.finish_op(); | |
29 | } | |
30 | }; | |
31 | ||
32 | JournalTrimmer::JournalTrimmer(librados::IoCtx &ioctx, | |
33 | const std::string &object_oid_prefix, | |
9f95a23c | 34 | const ceph::ref_t<JournalMetadata>& journal_metadata) |
7c673cae FG |
35 | : m_cct(NULL), m_object_oid_prefix(object_oid_prefix), |
36 | m_journal_metadata(journal_metadata), m_metadata_listener(this), | |
9f95a23c | 37 | m_remove_set_pending(false), |
7c673cae FG |
38 | m_remove_set(0), m_remove_set_ctx(NULL) { |
39 | m_ioctx.dup(ioctx); | |
40 | m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct()); | |
41 | ||
42 | m_journal_metadata->add_listener(&m_metadata_listener); | |
43 | } | |
44 | ||
45 | JournalTrimmer::~JournalTrimmer() { | |
11fdf7f2 | 46 | ceph_assert(m_shutdown); |
7c673cae FG |
47 | } |
48 | ||
49 | void JournalTrimmer::shut_down(Context *on_finish) { | |
50 | ldout(m_cct, 20) << __func__ << dendl; | |
51 | { | |
9f95a23c | 52 | std::lock_guard locker{m_lock}; |
11fdf7f2 | 53 | ceph_assert(!m_shutdown); |
7c673cae FG |
54 | m_shutdown = true; |
55 | } | |
56 | ||
57 | m_journal_metadata->remove_listener(&m_metadata_listener); | |
58 | ||
59 | // chain the shut down sequence (reverse order) | |
9f95a23c | 60 | on_finish = new LambdaContext([this, on_finish](int r) { |
7c673cae FG |
61 | m_async_op_tracker.wait_for_ops(on_finish); |
62 | }); | |
63 | m_journal_metadata->flush_commit_position(on_finish); | |
64 | } | |
65 | ||
66 | void JournalTrimmer::remove_objects(bool force, Context *on_finish) { | |
67 | ldout(m_cct, 20) << __func__ << dendl; | |
68 | ||
9f95a23c TL |
69 | on_finish = new LambdaContext([this, force, on_finish](int r) { |
70 | std::lock_guard locker{m_lock}; | |
7c673cae FG |
71 | |
72 | if (m_remove_set_pending) { | |
73 | on_finish->complete(-EBUSY); | |
74 | } | |
75 | ||
76 | if (!force) { | |
77 | JournalMetadata::RegisteredClients registered_clients; | |
78 | m_journal_metadata->get_registered_clients(®istered_clients); | |
79 | ||
80 | if (registered_clients.size() == 0) { | |
81 | on_finish->complete(-EINVAL); | |
82 | return; | |
83 | } else if (registered_clients.size() > 1) { | |
84 | on_finish->complete(-EBUSY); | |
85 | return; | |
86 | } | |
87 | } | |
88 | ||
89 | m_remove_set = std::numeric_limits<uint64_t>::max(); | |
90 | m_remove_set_pending = true; | |
91 | m_remove_set_ctx = on_finish; | |
92 | ||
93 | remove_set(m_journal_metadata->get_minimum_set()); | |
94 | }); | |
95 | ||
96 | m_async_op_tracker.wait_for_ops(on_finish); | |
97 | } | |
98 | ||
99 | void JournalTrimmer::committed(uint64_t commit_tid) { | |
100 | ldout(m_cct, 20) << __func__ << ": commit_tid=" << commit_tid << dendl; | |
101 | m_journal_metadata->committed(commit_tid, | |
102 | m_create_commit_position_safe_context); | |
103 | } | |
104 | ||
105 | void JournalTrimmer::trim_objects(uint64_t minimum_set) { | |
9f95a23c | 106 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
107 | |
108 | ldout(m_cct, 20) << __func__ << ": min_set=" << minimum_set << dendl; | |
109 | if (minimum_set <= m_journal_metadata->get_minimum_set()) { | |
110 | return; | |
111 | } | |
112 | ||
113 | if (m_remove_set_pending) { | |
11fdf7f2 | 114 | m_remove_set = std::max(m_remove_set, minimum_set); |
7c673cae FG |
115 | return; |
116 | } | |
117 | ||
118 | m_remove_set = minimum_set; | |
119 | m_remove_set_pending = true; | |
120 | remove_set(m_journal_metadata->get_minimum_set()); | |
121 | } | |
122 | ||
123 | void JournalTrimmer::remove_set(uint64_t object_set) { | |
9f95a23c | 124 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
125 | |
126 | m_async_op_tracker.start_op(); | |
127 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
128 | C_RemoveSet *ctx = new C_RemoveSet(this, object_set, splay_width); | |
129 | ||
130 | ldout(m_cct, 20) << __func__ << ": removing object set " << object_set | |
131 | << dendl; | |
132 | for (uint64_t object_number = object_set * splay_width; | |
133 | object_number < (object_set + 1) * splay_width; | |
134 | ++object_number) { | |
135 | std::string oid = utils::get_object_name(m_object_oid_prefix, | |
136 | object_number); | |
137 | ||
138 | ldout(m_cct, 20) << "removing journal object " << oid << dendl; | |
9f95a23c TL |
139 | auto comp = |
140 | librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback); | |
11fdf7f2 TL |
141 | int r = m_ioctx.aio_remove(oid, comp, |
142 | CEPH_OSD_FLAG_FULL_FORCE | CEPH_OSD_FLAG_FULL_TRY); | |
143 | ceph_assert(r == 0); | |
7c673cae FG |
144 | comp->release(); |
145 | } | |
146 | } | |
147 | ||
148 | void JournalTrimmer::handle_metadata_updated() { | |
149 | ldout(m_cct, 20) << __func__ << dendl; | |
150 | ||
9f95a23c | 151 | std::lock_guard locker{m_lock}; |
7c673cae FG |
152 | |
153 | JournalMetadata::RegisteredClients registered_clients; | |
154 | m_journal_metadata->get_registered_clients(®istered_clients); | |
155 | ||
156 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
157 | uint64_t minimum_set = m_journal_metadata->get_minimum_set(); | |
158 | uint64_t active_set = m_journal_metadata->get_active_set(); | |
159 | uint64_t minimum_commit_set = active_set; | |
160 | std::string minimum_client_id; | |
161 | ||
162 | for (auto &client : registered_clients) { | |
163 | if (client.state == cls::journal::CLIENT_STATE_DISCONNECTED) { | |
164 | continue; | |
165 | } | |
166 | ||
167 | if (client.commit_position.object_positions.empty()) { | |
168 | // client hasn't recorded any commits | |
169 | minimum_commit_set = minimum_set; | |
170 | minimum_client_id = client.id; | |
171 | break; | |
172 | } | |
173 | ||
174 | for (auto &position : client.commit_position.object_positions) { | |
175 | uint64_t object_set = position.object_number / splay_width; | |
176 | if (object_set < minimum_commit_set) { | |
177 | minimum_client_id = client.id; | |
178 | minimum_commit_set = object_set; | |
179 | } | |
180 | } | |
181 | } | |
182 | ||
183 | if (minimum_commit_set > minimum_set) { | |
184 | trim_objects(minimum_commit_set); | |
185 | } else { | |
186 | ldout(m_cct, 20) << "object set " << minimum_commit_set << " still " | |
187 | << "in-use by client " << minimum_client_id << dendl; | |
188 | } | |
189 | } | |
190 | ||
191 | void JournalTrimmer::handle_set_removed(int r, uint64_t object_set) { | |
192 | ldout(m_cct, 20) << __func__ << ": r=" << r << ", set=" << object_set << ", " | |
193 | << "trim=" << m_remove_set << dendl; | |
194 | ||
9f95a23c | 195 | std::lock_guard locker{m_lock}; |
7c673cae FG |
196 | m_remove_set_pending = false; |
197 | ||
198 | if (r == -ENOENT) { | |
199 | // no objects within the set existed | |
200 | r = 0; | |
201 | } | |
202 | if (r == 0) { | |
203 | // advance the minimum set to the next set | |
204 | m_journal_metadata->set_minimum_set(object_set + 1); | |
205 | uint64_t active_set = m_journal_metadata->get_active_set(); | |
206 | uint64_t minimum_set = m_journal_metadata->get_minimum_set(); | |
207 | ||
208 | if (m_remove_set > minimum_set && minimum_set <= active_set) { | |
209 | m_remove_set_pending = true; | |
210 | remove_set(minimum_set); | |
211 | } | |
212 | } | |
213 | ||
214 | if (m_remove_set_ctx != nullptr && !m_remove_set_pending) { | |
215 | ldout(m_cct, 20) << "completing remove set context" << dendl; | |
216 | m_remove_set_ctx->complete(r); | |
217 | m_remove_set_ctx = nullptr; | |
218 | } | |
219 | } | |
220 | ||
221 | JournalTrimmer::C_RemoveSet::C_RemoveSet(JournalTrimmer *_journal_trimmer, | |
222 | uint64_t _object_set, | |
223 | uint8_t _splay_width) | |
224 | : journal_trimmer(_journal_trimmer), object_set(_object_set), | |
9f95a23c | 225 | lock(ceph::make_mutex(utils::unique_lock_name("C_RemoveSet::lock", this))), |
7c673cae FG |
226 | refs(_splay_width), return_value(-ENOENT) { |
227 | } | |
228 | ||
229 | void JournalTrimmer::C_RemoveSet::complete(int r) { | |
9f95a23c | 230 | lock.lock(); |
7c673cae FG |
231 | if (r < 0 && r != -ENOENT && |
232 | (return_value == -ENOENT || return_value == 0)) { | |
233 | return_value = r; | |
234 | } else if (r == 0 && return_value == -ENOENT) { | |
235 | return_value = 0; | |
236 | } | |
237 | ||
238 | if (--refs == 0) { | |
239 | finish(return_value); | |
9f95a23c | 240 | lock.unlock(); |
7c673cae FG |
241 | delete this; |
242 | } else { | |
9f95a23c | 243 | lock.unlock(); |
7c673cae FG |
244 | } |
245 | } | |
246 | ||
247 | } // namespace journal |