]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "journal/JournalPlayer.h" | |
5 | #include "journal/Entry.h" | |
6 | #include "journal/JournalMetadata.h" | |
7 | #include "journal/ReplayHandler.h" | |
8 | #include "include/stringify.h" | |
9f95a23c | 9 | #include "common/ceph_mutex.h" |
7c673cae FG |
10 | #include "gtest/gtest.h" |
11 | #include "test/journal/RadosTestFixture.h" | |
12 | #include <list> | |
13 | #include <boost/scope_exit.hpp> | |
14 | ||
20effc67 | 15 | using namespace std::chrono_literals; |
7c673cae FG |
16 | typedef std::list<journal::Entry> Entries; |
17 | ||
18 | template <typename T> | |
19 | class TestJournalPlayer : public RadosTestFixture { | |
20 | public: | |
21 | typedef std::list<journal::JournalPlayer *> JournalPlayers; | |
22 | ||
23 | static const uint64_t max_fetch_bytes = T::max_fetch_bytes; | |
24 | ||
25 | struct ReplayHandler : public journal::ReplayHandler { | |
9f95a23c TL |
26 | ceph::mutex lock = ceph::make_mutex("lock"); |
27 | ceph::condition_variable cond; | |
7c673cae FG |
28 | bool entries_available; |
29 | bool complete; | |
30 | int complete_result; | |
31 | ||
32 | ReplayHandler() | |
9f95a23c | 33 | : entries_available(false), complete(false), |
7c673cae FG |
34 | complete_result(0) {} |
35 | ||
7c673cae | 36 | void handle_entries_available() override { |
9f95a23c | 37 | std::lock_guard locker{lock}; |
7c673cae | 38 | entries_available = true; |
9f95a23c | 39 | cond.notify_all(); |
7c673cae FG |
40 | } |
41 | ||
42 | void handle_complete(int r) override { | |
9f95a23c | 43 | std::lock_guard locker{lock}; |
7c673cae FG |
44 | complete = true; |
45 | complete_result = r; | |
9f95a23c | 46 | cond.notify_all(); |
7c673cae FG |
47 | } |
48 | }; | |
49 | ||
50 | void TearDown() override { | |
51 | for (JournalPlayers::iterator it = m_players.begin(); | |
52 | it != m_players.end(); ++it) { | |
53 | delete *it; | |
54 | } | |
55 | RadosTestFixture::TearDown(); | |
56 | } | |
57 | ||
9f95a23c | 58 | auto create_metadata(const std::string &oid) { |
7c673cae FG |
59 | return RadosTestFixture::create_metadata(oid, "client", 0.1, |
60 | max_fetch_bytes); | |
61 | } | |
62 | ||
63 | int client_commit(const std::string &oid, | |
64 | journal::JournalPlayer::ObjectSetPosition position) { | |
65 | return RadosTestFixture::client_commit(oid, "client", position); | |
66 | } | |
67 | ||
68 | journal::Entry create_entry(uint64_t tag_tid, uint64_t entry_tid) { | |
69 | std::string payload(128, '0'); | |
70 | bufferlist payload_bl; | |
71 | payload_bl.append(payload); | |
72 | return journal::Entry(tag_tid, entry_tid, payload_bl); | |
73 | } | |
74 | ||
75 | journal::JournalPlayer *create_player(const std::string &oid, | |
9f95a23c | 76 | const ceph::ref_t<journal::JournalMetadata>& metadata) { |
7c673cae | 77 | journal::JournalPlayer *player(new journal::JournalPlayer( |
9f95a23c | 78 | m_ioctx, oid + ".", metadata, &m_replay_hander, nullptr)); |
7c673cae FG |
79 | m_players.push_back(player); |
80 | return player; | |
81 | } | |
82 | ||
83 | bool wait_for_entries(journal::JournalPlayer *player, uint32_t count, | |
84 | Entries *entries) { | |
85 | entries->clear(); | |
86 | while (entries->size() < count) { | |
87 | journal::Entry entry; | |
88 | uint64_t commit_tid; | |
89 | while (entries->size() < count && | |
90 | player->try_pop_front(&entry, &commit_tid)) { | |
91 | entries->push_back(entry); | |
92 | } | |
93 | if (entries->size() == count) { | |
94 | break; | |
95 | } | |
96 | ||
9f95a23c | 97 | std::unique_lock locker{m_replay_hander.lock}; |
7c673cae FG |
98 | if (m_replay_hander.entries_available) { |
99 | m_replay_hander.entries_available = false; | |
9f95a23c TL |
100 | } else if (m_replay_hander.cond.wait_for(locker, 10s) == |
101 | std::cv_status::timeout) { | |
7c673cae FG |
102 | break; |
103 | } | |
104 | } | |
105 | return entries->size() == count; | |
106 | } | |
107 | ||
108 | bool wait_for_complete(journal::JournalPlayer *player) { | |
9f95a23c | 109 | std::unique_lock locker{m_replay_hander.lock}; |
7c673cae FG |
110 | while (!m_replay_hander.complete) { |
111 | journal::Entry entry; | |
112 | uint64_t commit_tid; | |
113 | player->try_pop_front(&entry, &commit_tid); | |
114 | ||
9f95a23c TL |
115 | if (m_replay_hander.cond.wait_for(locker, 10s) == |
116 | std::cv_status::timeout) { | |
7c673cae FG |
117 | return false; |
118 | } | |
119 | } | |
120 | m_replay_hander.complete = false; | |
121 | return true; | |
122 | } | |
123 | ||
124 | int write_entry(const std::string &oid, uint64_t object_num, | |
125 | uint64_t tag_tid, uint64_t entry_tid) { | |
126 | bufferlist bl; | |
11fdf7f2 | 127 | encode(create_entry(tag_tid, entry_tid), bl); |
7c673cae FG |
128 | return append(oid + "." + stringify(object_num), bl); |
129 | } | |
130 | ||
131 | JournalPlayers m_players; | |
132 | ReplayHandler m_replay_hander; | |
133 | }; | |
134 | ||
135 | template <uint64_t _max_fetch_bytes> | |
136 | class TestJournalPlayerParams { | |
137 | public: | |
138 | static const uint64_t max_fetch_bytes = _max_fetch_bytes; | |
139 | }; | |
140 | ||
141 | typedef ::testing::Types<TestJournalPlayerParams<0>, | |
142 | TestJournalPlayerParams<16> > TestJournalPlayerTypes; | |
9f95a23c | 143 | TYPED_TEST_SUITE(TestJournalPlayer, TestJournalPlayerTypes); |
7c673cae FG |
144 | |
145 | TYPED_TEST(TestJournalPlayer, Prefetch) { | |
146 | std::string oid = this->get_temp_oid(); | |
147 | ||
148 | journal::JournalPlayer::ObjectPositions positions; | |
149 | positions = { | |
150 | cls::journal::ObjectPosition(0, 234, 122) }; | |
151 | cls::journal::ObjectSetPosition commit_position(positions); | |
152 | ||
153 | ASSERT_EQ(0, this->create(oid)); | |
154 | ASSERT_EQ(0, this->client_register(oid)); | |
155 | ASSERT_EQ(0, this->client_commit(oid, commit_position)); | |
156 | ||
9f95a23c | 157 | auto metadata = this->create_metadata(oid); |
7c673cae FG |
158 | ASSERT_EQ(0, this->init_metadata(metadata)); |
159 | ||
160 | journal::JournalPlayer *player = this->create_player(oid, metadata); | |
161 | BOOST_SCOPE_EXIT_ALL( (player) ) { | |
162 | C_SaferCond unwatch_ctx; | |
163 | player->shut_down(&unwatch_ctx); | |
164 | ASSERT_EQ(0, unwatch_ctx.wait()); | |
165 | }; | |
166 | ||
167 | ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122)); | |
168 | ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123)); | |
169 | ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124)); | |
170 | ASSERT_EQ(0, this->write_entry(oid, 1, 234, 125)); | |
171 | ||
172 | player->prefetch(); | |
173 | ||
174 | Entries entries; | |
175 | ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); | |
176 | ASSERT_TRUE(this->wait_for_complete(player)); | |
177 | ||
178 | Entries expected_entries; | |
179 | expected_entries = { | |
180 | this->create_entry(234, 123), | |
181 | this->create_entry(234, 124), | |
182 | this->create_entry(234, 125)}; | |
183 | ASSERT_EQ(expected_entries, entries); | |
184 | ||
185 | uint64_t last_tid; | |
186 | ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid)); | |
187 | ASSERT_EQ(125U, last_tid); | |
188 | } | |
189 | ||
190 | TYPED_TEST(TestJournalPlayer, PrefetchSkip) { | |
191 | std::string oid = this->get_temp_oid(); | |
192 | ||
193 | journal::JournalPlayer::ObjectPositions positions; | |
194 | positions = { | |
195 | cls::journal::ObjectPosition(0, 234, 125), | |
196 | cls::journal::ObjectPosition(1, 234, 124) }; | |
197 | cls::journal::ObjectSetPosition commit_position(positions); | |
198 | ||
199 | ASSERT_EQ(0, this->create(oid)); | |
200 | ASSERT_EQ(0, this->client_register(oid)); | |
201 | ASSERT_EQ(0, this->client_commit(oid, commit_position)); | |
202 | ||
9f95a23c | 203 | auto metadata = this->create_metadata(oid); |
7c673cae FG |
204 | ASSERT_EQ(0, this->init_metadata(metadata)); |
205 | ||
206 | journal::JournalPlayer *player = this->create_player(oid, metadata); | |
207 | BOOST_SCOPE_EXIT_ALL( (player) ) { | |
208 | C_SaferCond unwatch_ctx; | |
209 | player->shut_down(&unwatch_ctx); | |
210 | ASSERT_EQ(0, unwatch_ctx.wait()); | |
211 | }; | |
212 | ||
213 | ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122)); | |
214 | ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123)); | |
215 | ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124)); | |
216 | ASSERT_EQ(0, this->write_entry(oid, 1, 234, 125)); | |
217 | ||
218 | player->prefetch(); | |
219 | ||
220 | Entries entries; | |
221 | ASSERT_TRUE(this->wait_for_entries(player, 0, &entries)); | |
222 | ASSERT_TRUE(this->wait_for_complete(player)); | |
223 | ||
224 | uint64_t last_tid; | |
225 | ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid)); | |
226 | ASSERT_EQ(125U, last_tid); | |
227 | } | |
228 | ||
229 | TYPED_TEST(TestJournalPlayer, PrefetchWithoutCommit) { | |
230 | std::string oid = this->get_temp_oid(); | |
231 | ||
232 | cls::journal::ObjectSetPosition commit_position; | |
233 | ||
234 | ASSERT_EQ(0, this->create(oid)); | |
235 | ASSERT_EQ(0, this->client_register(oid)); | |
236 | ASSERT_EQ(0, this->client_commit(oid, commit_position)); | |
237 | ||
9f95a23c | 238 | auto metadata = this->create_metadata(oid); |
7c673cae FG |
239 | ASSERT_EQ(0, this->init_metadata(metadata)); |
240 | ||
241 | journal::JournalPlayer *player = this->create_player(oid, metadata); | |
242 | BOOST_SCOPE_EXIT_ALL( (player) ) { | |
243 | C_SaferCond unwatch_ctx; | |
244 | player->shut_down(&unwatch_ctx); | |
245 | ASSERT_EQ(0, unwatch_ctx.wait()); | |
246 | }; | |
247 | ||
248 | ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122)); | |
249 | ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123)); | |
250 | ||
251 | player->prefetch(); | |
252 | ||
253 | Entries entries; | |
254 | ASSERT_TRUE(this->wait_for_entries(player, 2, &entries)); | |
255 | ASSERT_TRUE(this->wait_for_complete(player)); | |
256 | ||
257 | Entries expected_entries; | |
258 | expected_entries = { | |
259 | this->create_entry(234, 122), | |
260 | this->create_entry(234, 123)}; | |
261 | ASSERT_EQ(expected_entries, entries); | |
262 | } | |
263 | ||
264 | TYPED_TEST(TestJournalPlayer, PrefetchMultipleTags) { | |
265 | std::string oid = this->get_temp_oid(); | |
266 | ||
267 | journal::JournalPlayer::ObjectPositions positions; | |
268 | positions = { | |
269 | cls::journal::ObjectPosition(2, 234, 122), | |
270 | cls::journal::ObjectPosition(1, 234, 121), | |
271 | cls::journal::ObjectPosition(0, 234, 120)}; | |
272 | cls::journal::ObjectSetPosition commit_position(positions); | |
273 | ||
274 | ASSERT_EQ(0, this->create(oid, 14, 3)); | |
275 | ASSERT_EQ(0, this->client_register(oid)); | |
276 | ASSERT_EQ(0, this->client_commit(oid, commit_position)); | |
277 | ||
9f95a23c | 278 | auto metadata = this->create_metadata(oid); |
7c673cae FG |
279 | ASSERT_EQ(0, this->init_metadata(metadata)); |
280 | ||
281 | journal::JournalPlayer *player = this->create_player(oid, metadata); | |
282 | BOOST_SCOPE_EXIT_ALL( (player) ) { | |
283 | C_SaferCond unwatch_ctx; | |
284 | player->shut_down(&unwatch_ctx); | |
285 | ASSERT_EQ(0, unwatch_ctx.wait()); | |
286 | }; | |
287 | ||
288 | ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120)); | |
289 | ASSERT_EQ(0, this->write_entry(oid, 1, 234, 121)); | |
290 | ASSERT_EQ(0, this->write_entry(oid, 2, 234, 122)); | |
291 | ASSERT_EQ(0, this->write_entry(oid, 0, 234, 123)); | |
292 | ASSERT_EQ(0, this->write_entry(oid, 1, 234, 124)); | |
293 | ASSERT_EQ(0, this->write_entry(oid, 0, 236, 0)); // new tag allocated | |
294 | ||
295 | player->prefetch(); | |
296 | ||
297 | Entries entries; | |
298 | ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); | |
299 | ASSERT_TRUE(this->wait_for_complete(player)); | |
300 | ||
301 | uint64_t last_tid; | |
302 | ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid)); | |
303 | ASSERT_EQ(124U, last_tid); | |
304 | ASSERT_TRUE(metadata->get_last_allocated_entry_tid(236, &last_tid)); | |
305 | ASSERT_EQ(0U, last_tid); | |
306 | } | |
307 | ||
308 | TYPED_TEST(TestJournalPlayer, PrefetchCorruptSequence) { | |
309 | std::string oid = this->get_temp_oid(); | |
310 | ||
311 | cls::journal::ObjectSetPosition commit_position; | |
312 | ||
313 | ASSERT_EQ(0, this->create(oid)); | |
314 | ASSERT_EQ(0, this->client_register(oid)); | |
315 | ASSERT_EQ(0, this->client_commit(oid, commit_position)); | |
316 | ||
9f95a23c | 317 | auto metadata = this->create_metadata(oid); |
7c673cae FG |
318 | ASSERT_EQ(0, this->init_metadata(metadata)); |
319 | ||
320 | journal::JournalPlayer *player = this->create_player(oid, metadata); | |
321 | BOOST_SCOPE_EXIT_ALL( (player) ) { | |
322 | C_SaferCond unwatch_ctx; | |
323 | player->shut_down(&unwatch_ctx); | |
324 | ASSERT_EQ(0, unwatch_ctx.wait()); | |
325 | }; | |
326 | ||
327 | ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120)); | |
328 | ASSERT_EQ(0, this->write_entry(oid, 1, 234, 121)); | |
329 | ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124)); | |
330 | ||
331 | player->prefetch(); | |
332 | Entries entries; | |
333 | ASSERT_TRUE(this->wait_for_entries(player, 2, &entries)); | |
334 | ||
335 | journal::Entry entry; | |
336 | uint64_t commit_tid; | |
337 | ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid)); | |
338 | ASSERT_TRUE(this->wait_for_complete(player)); | |
339 | ASSERT_EQ(-ENOMSG, this->m_replay_hander.complete_result); | |
340 | } | |
341 | ||
342 | TYPED_TEST(TestJournalPlayer, PrefetchMissingSequence) { | |
343 | std::string oid = this->get_temp_oid(); | |
344 | ||
345 | cls::journal::ObjectSetPosition commit_position; | |
346 | ||
347 | ASSERT_EQ(0, this->create(oid, 14, 4)); | |
348 | ASSERT_EQ(0, this->client_register(oid)); | |
349 | ASSERT_EQ(0, this->client_commit(oid, commit_position)); | |
350 | ||
9f95a23c | 351 | auto metadata = this->create_metadata(oid); |
7c673cae FG |
352 | ASSERT_EQ(0, this->init_metadata(metadata)); |
353 | ||
354 | journal::JournalPlayer *player = this->create_player(oid, metadata); | |
355 | BOOST_SCOPE_EXIT_ALL( (player) ) { | |
356 | C_SaferCond unwatch_ctx; | |
357 | player->shut_down(&unwatch_ctx); | |
358 | ASSERT_EQ(0, unwatch_ctx.wait()); | |
359 | }; | |
360 | ||
361 | ASSERT_EQ(0, metadata->set_active_set(1)); | |
362 | ASSERT_EQ(0, this->write_entry(oid, 0, 2, 852)); | |
363 | ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856)); | |
364 | ASSERT_EQ(0, this->write_entry(oid, 0, 2, 860)); | |
365 | ASSERT_EQ(0, this->write_entry(oid, 1, 2, 853)); | |
366 | ASSERT_EQ(0, this->write_entry(oid, 1, 2, 857)); | |
367 | ASSERT_EQ(0, this->write_entry(oid, 5, 2, 861)); | |
368 | ASSERT_EQ(0, this->write_entry(oid, 2, 2, 854)); | |
369 | ASSERT_EQ(0, this->write_entry(oid, 0, 3, 0)); | |
370 | ASSERT_EQ(0, this->write_entry(oid, 5, 3, 1)); | |
371 | ASSERT_EQ(0, this->write_entry(oid, 2, 3, 2)); | |
372 | ASSERT_EQ(0, this->write_entry(oid, 3, 3, 3)); | |
373 | ||
374 | player->prefetch(); | |
375 | Entries entries; | |
376 | ASSERT_TRUE(this->wait_for_entries(player, 7, &entries)); | |
377 | ||
378 | Entries expected_entries = { | |
379 | this->create_entry(2, 852), | |
380 | this->create_entry(2, 853), | |
381 | this->create_entry(2, 854), | |
382 | this->create_entry(3, 0), | |
383 | this->create_entry(3, 1), | |
384 | this->create_entry(3, 2), | |
385 | this->create_entry(3, 3)}; | |
386 | ASSERT_EQ(expected_entries, entries); | |
387 | ||
388 | ASSERT_TRUE(this->wait_for_complete(player)); | |
389 | ASSERT_EQ(0, this->m_replay_hander.complete_result); | |
390 | } | |
391 | ||
392 | TYPED_TEST(TestJournalPlayer, PrefetchLargeMissingSequence) { | |
393 | std::string oid = this->get_temp_oid(); | |
394 | ||
395 | cls::journal::ObjectSetPosition commit_position; | |
396 | ||
397 | ASSERT_EQ(0, this->create(oid)); | |
398 | ASSERT_EQ(0, this->client_register(oid)); | |
399 | ASSERT_EQ(0, this->client_commit(oid, commit_position)); | |
400 | ||
9f95a23c | 401 | auto metadata = this->create_metadata(oid); |
7c673cae FG |
402 | ASSERT_EQ(0, this->init_metadata(metadata)); |
403 | ||
404 | journal::JournalPlayer *player = this->create_player(oid, metadata); | |
405 | BOOST_SCOPE_EXIT_ALL( (player) ) { | |
406 | C_SaferCond unwatch_ctx; | |
407 | player->shut_down(&unwatch_ctx); | |
408 | ASSERT_EQ(0, unwatch_ctx.wait()); | |
409 | }; | |
410 | ||
411 | ASSERT_EQ(0, metadata->set_active_set(2)); | |
412 | ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0)); | |
413 | ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1)); | |
414 | ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3)); | |
415 | ASSERT_EQ(0, this->write_entry(oid, 4, 1, 0)); | |
416 | ||
417 | player->prefetch(); | |
418 | Entries entries; | |
419 | ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); | |
420 | ||
421 | Entries expected_entries = { | |
422 | this->create_entry(0, 0), | |
423 | this->create_entry(0, 1), | |
424 | this->create_entry(1, 0)}; | |
425 | ASSERT_EQ(expected_entries, entries); | |
426 | } | |
427 | ||
428 | TYPED_TEST(TestJournalPlayer, PrefetchBlockedNewTag) { | |
429 | std::string oid = this->get_temp_oid(); | |
430 | ||
431 | cls::journal::ObjectSetPosition commit_position; | |
432 | ||
433 | ASSERT_EQ(0, this->create(oid)); | |
434 | ASSERT_EQ(0, this->client_register(oid)); | |
435 | ASSERT_EQ(0, this->client_commit(oid, commit_position)); | |
436 | ||
9f95a23c | 437 | auto metadata = this->create_metadata(oid); |
7c673cae FG |
438 | ASSERT_EQ(0, this->init_metadata(metadata)); |
439 | ||
440 | journal::JournalPlayer *player = this->create_player(oid, metadata); | |
441 | BOOST_SCOPE_EXIT_ALL( (player) ) { | |
442 | C_SaferCond unwatch_ctx; | |
443 | player->shut_down(&unwatch_ctx); | |
444 | ASSERT_EQ(0, unwatch_ctx.wait()); | |
445 | }; | |
446 | ||
447 | ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0)); | |
448 | ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1)); | |
449 | ASSERT_EQ(0, this->write_entry(oid, 0, 0, 2)); | |
450 | ASSERT_EQ(0, this->write_entry(oid, 0, 0, 4)); | |
451 | ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0)); | |
452 | ||
453 | player->prefetch(); | |
454 | Entries entries; | |
455 | ASSERT_TRUE(this->wait_for_entries(player, 4, &entries)); | |
456 | ||
457 | Entries expected_entries = { | |
458 | this->create_entry(0, 0), | |
459 | this->create_entry(0, 1), | |
460 | this->create_entry(0, 2), | |
461 | this->create_entry(1, 0)}; | |
462 | ASSERT_EQ(expected_entries, entries); | |
463 | } | |
464 | ||
465 | TYPED_TEST(TestJournalPlayer, PrefetchStaleEntries) { | |
466 | std::string oid = this->get_temp_oid(); | |
467 | ||
468 | journal::JournalPlayer::ObjectPositions positions = { | |
469 | cls::journal::ObjectPosition(0, 1, 0) }; | |
470 | cls::journal::ObjectSetPosition commit_position(positions); | |
471 | ||
472 | ASSERT_EQ(0, this->create(oid)); | |
473 | ASSERT_EQ(0, this->client_register(oid)); | |
474 | ASSERT_EQ(0, this->client_commit(oid, commit_position)); | |
475 | ||
9f95a23c | 476 | auto metadata = this->create_metadata(oid); |
7c673cae FG |
477 | ASSERT_EQ(0, this->init_metadata(metadata)); |
478 | ||
479 | journal::JournalPlayer *player = this->create_player(oid, metadata); | |
480 | BOOST_SCOPE_EXIT_ALL( (player) ) { | |
481 | C_SaferCond unwatch_ctx; | |
482 | player->shut_down(&unwatch_ctx); | |
483 | ASSERT_EQ(0, unwatch_ctx.wait()); | |
484 | }; | |
485 | ||
486 | ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1)); | |
487 | ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3)); | |
488 | ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0)); | |
489 | ASSERT_EQ(0, this->write_entry(oid, 1, 1, 1)); | |
490 | ||
491 | player->prefetch(); | |
492 | Entries entries; | |
493 | ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); | |
494 | ||
495 | Entries expected_entries = { | |
496 | this->create_entry(1, 1)}; | |
497 | ASSERT_EQ(expected_entries, entries); | |
498 | ||
499 | ASSERT_TRUE(this->wait_for_complete(player)); | |
500 | ASSERT_EQ(0, this->m_replay_hander.complete_result); | |
501 | } | |
502 | ||
503 | TYPED_TEST(TestJournalPlayer, PrefetchUnexpectedTag) { | |
504 | std::string oid = this->get_temp_oid(); | |
505 | ||
506 | cls::journal::ObjectSetPosition commit_position; | |
507 | ||
508 | ASSERT_EQ(0, this->create(oid)); | |
509 | ASSERT_EQ(0, this->client_register(oid)); | |
510 | ASSERT_EQ(0, this->client_commit(oid, commit_position)); | |
511 | ||
9f95a23c | 512 | auto metadata = this->create_metadata(oid); |
7c673cae FG |
513 | ASSERT_EQ(0, this->init_metadata(metadata)); |
514 | ||
515 | journal::JournalPlayer *player = this->create_player(oid, metadata); | |
516 | BOOST_SCOPE_EXIT_ALL( (player) ) { | |
517 | C_SaferCond unwatch_ctx; | |
518 | player->shut_down(&unwatch_ctx); | |
519 | ASSERT_EQ(0, unwatch_ctx.wait()); | |
520 | }; | |
521 | ||
522 | ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120)); | |
523 | ASSERT_EQ(0, this->write_entry(oid, 1, 235, 121)); | |
524 | ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124)); | |
525 | ||
526 | player->prefetch(); | |
527 | Entries entries; | |
528 | ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); | |
529 | ||
530 | journal::Entry entry; | |
531 | uint64_t commit_tid; | |
532 | ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid)); | |
533 | ASSERT_TRUE(this->wait_for_complete(player)); | |
534 | ASSERT_EQ(0, this->m_replay_hander.complete_result); | |
535 | } | |
536 | ||
537 | TYPED_TEST(TestJournalPlayer, PrefetchAndWatch) { | |
538 | std::string oid = this->get_temp_oid(); | |
539 | ||
540 | journal::JournalPlayer::ObjectPositions positions; | |
541 | positions = { | |
542 | cls::journal::ObjectPosition(0, 234, 122)}; | |
543 | cls::journal::ObjectSetPosition commit_position(positions); | |
544 | ||
545 | ASSERT_EQ(0, this->create(oid)); | |
546 | ASSERT_EQ(0, this->client_register(oid)); | |
547 | ASSERT_EQ(0, this->client_commit(oid, commit_position)); | |
548 | ||
9f95a23c | 549 | auto metadata = this->create_metadata(oid); |
7c673cae FG |
550 | ASSERT_EQ(0, this->init_metadata(metadata)); |
551 | ||
552 | journal::JournalPlayer *player = this->create_player(oid, metadata); | |
553 | BOOST_SCOPE_EXIT_ALL( (player) ) { | |
554 | C_SaferCond unwatch_ctx; | |
555 | player->shut_down(&unwatch_ctx); | |
556 | ASSERT_EQ(0, unwatch_ctx.wait()); | |
557 | }; | |
558 | ||
559 | ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122)); | |
560 | ||
561 | player->prefetch_and_watch(0.25); | |
562 | ||
563 | Entries entries; | |
564 | ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123)); | |
565 | ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); | |
566 | ||
567 | Entries expected_entries; | |
568 | expected_entries = {this->create_entry(234, 123)}; | |
569 | ASSERT_EQ(expected_entries, entries); | |
570 | ||
571 | ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124)); | |
572 | ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); | |
573 | ||
574 | expected_entries = {this->create_entry(234, 124)}; | |
575 | ASSERT_EQ(expected_entries, entries); | |
576 | } | |
577 | ||
578 | TYPED_TEST(TestJournalPlayer, PrefetchSkippedObject) { | |
579 | std::string oid = this->get_temp_oid(); | |
580 | ||
581 | cls::journal::ObjectSetPosition commit_position; | |
582 | ||
583 | ASSERT_EQ(0, this->create(oid, 14, 3)); | |
584 | ASSERT_EQ(0, this->client_register(oid)); | |
585 | ASSERT_EQ(0, this->client_commit(oid, commit_position)); | |
586 | ||
9f95a23c | 587 | auto metadata = this->create_metadata(oid); |
7c673cae FG |
588 | ASSERT_EQ(0, this->init_metadata(metadata)); |
589 | ASSERT_EQ(0, metadata->set_active_set(2)); | |
590 | ||
591 | journal::JournalPlayer *player = this->create_player(oid, metadata); | |
592 | BOOST_SCOPE_EXIT_ALL( (player) ) { | |
593 | C_SaferCond unwatch_ctx; | |
594 | player->shut_down(&unwatch_ctx); | |
595 | ASSERT_EQ(0, unwatch_ctx.wait()); | |
596 | }; | |
597 | ||
598 | ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122)); | |
599 | ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123)); | |
600 | ASSERT_EQ(0, this->write_entry(oid, 5, 234, 124)); | |
601 | ASSERT_EQ(0, this->write_entry(oid, 6, 234, 125)); | |
602 | ASSERT_EQ(0, this->write_entry(oid, 7, 234, 126)); | |
603 | ||
604 | player->prefetch(); | |
605 | ||
606 | Entries entries; | |
607 | ASSERT_TRUE(this->wait_for_entries(player, 5, &entries)); | |
608 | ASSERT_TRUE(this->wait_for_complete(player)); | |
609 | ||
610 | Entries expected_entries; | |
611 | expected_entries = { | |
612 | this->create_entry(234, 122), | |
613 | this->create_entry(234, 123), | |
614 | this->create_entry(234, 124), | |
615 | this->create_entry(234, 125), | |
616 | this->create_entry(234, 126)}; | |
617 | ASSERT_EQ(expected_entries, entries); | |
618 | ||
619 | uint64_t last_tid; | |
620 | ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid)); | |
621 | ASSERT_EQ(126U, last_tid); | |
622 | } | |
623 | ||
624 | TYPED_TEST(TestJournalPlayer, ImbalancedJournal) { | |
625 | std::string oid = this->get_temp_oid(); | |
626 | ||
627 | journal::JournalPlayer::ObjectPositions positions = { | |
628 | cls::journal::ObjectPosition(9, 300, 1), | |
629 | cls::journal::ObjectPosition(8, 300, 0), | |
630 | cls::journal::ObjectPosition(10, 200, 4334), | |
631 | cls::journal::ObjectPosition(11, 200, 4331) }; | |
632 | cls::journal::ObjectSetPosition commit_position(positions); | |
633 | ||
634 | ASSERT_EQ(0, this->create(oid, 14, 4)); | |
635 | ASSERT_EQ(0, this->client_register(oid)); | |
636 | ASSERT_EQ(0, this->client_commit(oid, commit_position)); | |
637 | ||
9f95a23c | 638 | auto metadata = this->create_metadata(oid); |
7c673cae FG |
639 | ASSERT_EQ(0, this->init_metadata(metadata)); |
640 | ASSERT_EQ(0, metadata->set_active_set(2)); | |
641 | metadata->set_minimum_set(2); | |
642 | ||
643 | journal::JournalPlayer *player = this->create_player(oid, metadata); | |
644 | BOOST_SCOPE_EXIT_ALL( (player) ) { | |
645 | C_SaferCond unwatch_ctx; | |
646 | player->shut_down(&unwatch_ctx); | |
647 | ASSERT_EQ(0, unwatch_ctx.wait()); | |
648 | }; | |
649 | ||
650 | ASSERT_EQ(0, this->write_entry(oid, 8, 300, 0)); | |
651 | ASSERT_EQ(0, this->write_entry(oid, 8, 301, 0)); | |
652 | ASSERT_EQ(0, this->write_entry(oid, 9, 300, 1)); | |
653 | ASSERT_EQ(0, this->write_entry(oid, 9, 301, 1)); | |
654 | ASSERT_EQ(0, this->write_entry(oid, 10, 200, 4334)); | |
655 | ASSERT_EQ(0, this->write_entry(oid, 10, 301, 2)); | |
656 | ASSERT_EQ(0, this->write_entry(oid, 11, 200, 4331)); | |
657 | ASSERT_EQ(0, this->write_entry(oid, 11, 301, 3)); | |
658 | ||
659 | player->prefetch(); | |
660 | ||
661 | Entries entries; | |
662 | ASSERT_TRUE(this->wait_for_entries(player, 4, &entries)); | |
663 | ASSERT_TRUE(this->wait_for_complete(player)); | |
664 | ||
665 | Entries expected_entries; | |
666 | expected_entries = { | |
667 | this->create_entry(301, 0), | |
668 | this->create_entry(301, 1), | |
669 | this->create_entry(301, 2), | |
670 | this->create_entry(301, 3)}; | |
671 | ASSERT_EQ(expected_entries, entries); | |
672 | ||
673 | uint64_t last_tid; | |
674 | ASSERT_TRUE(metadata->get_last_allocated_entry_tid(301, &last_tid)); | |
675 | ASSERT_EQ(3U, last_tid); | |
676 | } | |
677 | ||
678 | TYPED_TEST(TestJournalPlayer, LiveReplayLaggyAppend) { | |
679 | std::string oid = this->get_temp_oid(); | |
680 | ||
681 | cls::journal::ObjectSetPosition commit_position; | |
682 | ||
683 | ASSERT_EQ(0, this->create(oid)); | |
684 | ASSERT_EQ(0, this->client_register(oid)); | |
685 | ASSERT_EQ(0, this->client_commit(oid, commit_position)); | |
686 | ||
9f95a23c | 687 | auto metadata = this->create_metadata(oid); |
7c673cae FG |
688 | ASSERT_EQ(0, this->init_metadata(metadata)); |
689 | ||
690 | journal::JournalPlayer *player = this->create_player(oid, metadata); | |
691 | BOOST_SCOPE_EXIT_ALL( (player) ) { | |
692 | C_SaferCond unwatch_ctx; | |
693 | player->shut_down(&unwatch_ctx); | |
694 | ASSERT_EQ(0, unwatch_ctx.wait()); | |
695 | }; | |
696 | ||
697 | ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0)); | |
698 | ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1)); | |
699 | ASSERT_EQ(0, this->write_entry(oid, 0, 0, 2)); | |
700 | ASSERT_EQ(0, this->write_entry(oid, 0, 0, 4)); | |
701 | ASSERT_EQ(0, this->write_entry(oid, 3, 0, 5)); // laggy entry 0/3 in object 1 | |
702 | player->prefetch_and_watch(0.25); | |
703 | ||
704 | Entries entries; | |
705 | ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); | |
706 | ||
707 | Entries expected_entries = { | |
708 | this->create_entry(0, 0), | |
709 | this->create_entry(0, 1), | |
710 | this->create_entry(0, 2)}; | |
711 | ASSERT_EQ(expected_entries, entries); | |
712 | ||
713 | journal::Entry entry; | |
714 | uint64_t commit_tid; | |
715 | ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid)); | |
716 | ||
717 | ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3)); | |
718 | ASSERT_EQ(0, metadata->set_active_set(1)); | |
719 | ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); | |
720 | ||
721 | expected_entries = { | |
722 | this->create_entry(0, 3), | |
723 | this->create_entry(0, 4), | |
724 | this->create_entry(0, 5)}; | |
725 | ASSERT_EQ(expected_entries, entries); | |
726 | } | |
727 | ||
728 | TYPED_TEST(TestJournalPlayer, LiveReplayMissingSequence) { | |
729 | std::string oid = this->get_temp_oid(); | |
730 | ||
731 | cls::journal::ObjectSetPosition commit_position; | |
732 | ||
733 | ASSERT_EQ(0, this->create(oid, 14, 4)); | |
734 | ASSERT_EQ(0, this->client_register(oid)); | |
735 | ASSERT_EQ(0, this->client_commit(oid, commit_position)); | |
736 | ||
9f95a23c | 737 | auto metadata = this->create_metadata(oid); |
7c673cae FG |
738 | ASSERT_EQ(0, this->init_metadata(metadata)); |
739 | ||
740 | journal::JournalPlayer *player = this->create_player(oid, metadata); | |
741 | BOOST_SCOPE_EXIT_ALL( (player) ) { | |
742 | C_SaferCond unwatch_ctx; | |
743 | player->shut_down(&unwatch_ctx); | |
744 | ASSERT_EQ(0, unwatch_ctx.wait()); | |
745 | }; | |
746 | ||
747 | ASSERT_EQ(0, this->write_entry(oid, 0, 2, 852)); | |
748 | ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856)); | |
749 | ASSERT_EQ(0, this->write_entry(oid, 0, 2, 860)); | |
750 | ASSERT_EQ(0, this->write_entry(oid, 1, 2, 853)); | |
751 | ASSERT_EQ(0, this->write_entry(oid, 1, 2, 857)); | |
752 | ASSERT_EQ(0, this->write_entry(oid, 2, 2, 854)); | |
753 | ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856)); | |
754 | player->prefetch_and_watch(0.25); | |
755 | ||
756 | Entries entries; | |
757 | ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); | |
758 | ||
759 | Entries expected_entries = { | |
760 | this->create_entry(2, 852), | |
761 | this->create_entry(2, 853), | |
762 | this->create_entry(2, 854)}; | |
763 | ASSERT_EQ(expected_entries, entries); | |
764 | ||
765 | journal::Entry entry; | |
766 | uint64_t commit_tid; | |
767 | ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid)); | |
768 | ||
769 | ASSERT_EQ(0, this->write_entry(oid, 3, 3, 3)); | |
770 | ASSERT_EQ(0, this->write_entry(oid, 2, 3, 2)); | |
771 | ASSERT_EQ(0, this->write_entry(oid, 1, 3, 1)); | |
772 | ASSERT_EQ(0, this->write_entry(oid, 0, 3, 0)); | |
773 | ASSERT_TRUE(this->wait_for_entries(player, 4, &entries)); | |
774 | ||
775 | expected_entries = { | |
776 | this->create_entry(3, 0), | |
777 | this->create_entry(3, 1), | |
778 | this->create_entry(3, 2), | |
779 | this->create_entry(3, 3)}; | |
780 | ASSERT_EQ(expected_entries, entries); | |
781 | } | |
782 | ||
783 | TYPED_TEST(TestJournalPlayer, LiveReplayLargeMissingSequence) { | |
784 | std::string oid = this->get_temp_oid(); | |
785 | ||
786 | cls::journal::ObjectSetPosition commit_position; | |
787 | ||
788 | ASSERT_EQ(0, this->create(oid)); | |
789 | ASSERT_EQ(0, this->client_register(oid)); | |
790 | ASSERT_EQ(0, this->client_commit(oid, commit_position)); | |
791 | ||
9f95a23c | 792 | auto metadata = this->create_metadata(oid); |
7c673cae FG |
793 | ASSERT_EQ(0, this->init_metadata(metadata)); |
794 | ||
795 | journal::JournalPlayer *player = this->create_player(oid, metadata); | |
796 | BOOST_SCOPE_EXIT_ALL( (player) ) { | |
797 | C_SaferCond unwatch_ctx; | |
798 | player->shut_down(&unwatch_ctx); | |
799 | ASSERT_EQ(0, unwatch_ctx.wait()); | |
800 | }; | |
801 | ||
802 | ASSERT_EQ(0, metadata->set_active_set(2)); | |
803 | ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0)); | |
804 | ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1)); | |
805 | ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3)); | |
806 | ASSERT_EQ(0, this->write_entry(oid, 4, 1, 0)); | |
807 | player->prefetch_and_watch(0.25); | |
808 | ||
809 | Entries entries; | |
810 | ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); | |
811 | ||
812 | Entries expected_entries = { | |
813 | this->create_entry(0, 0), | |
814 | this->create_entry(0, 1), | |
815 | this->create_entry(1, 0)}; | |
816 | ASSERT_EQ(expected_entries, entries); | |
817 | } | |
818 | ||
819 | TYPED_TEST(TestJournalPlayer, LiveReplayBlockedNewTag) { | |
820 | std::string oid = this->get_temp_oid(); | |
821 | ||
822 | cls::journal::ObjectSetPosition commit_position; | |
823 | ||
824 | ASSERT_EQ(0, this->create(oid)); | |
825 | ASSERT_EQ(0, this->client_register(oid)); | |
826 | ASSERT_EQ(0, this->client_commit(oid, commit_position)); | |
827 | ||
9f95a23c | 828 | auto metadata = this->create_metadata(oid); |
7c673cae FG |
829 | ASSERT_EQ(0, this->init_metadata(metadata)); |
830 | ||
831 | journal::JournalPlayer *player = this->create_player(oid, metadata); | |
832 | BOOST_SCOPE_EXIT_ALL( (player) ) { | |
833 | C_SaferCond unwatch_ctx; | |
834 | player->shut_down(&unwatch_ctx); | |
835 | ASSERT_EQ(0, unwatch_ctx.wait()); | |
836 | }; | |
837 | ||
838 | C_SaferCond ctx1; | |
839 | cls::journal::Tag tag1; | |
840 | metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, {}, &tag1, &ctx1); | |
841 | ASSERT_EQ(0, ctx1.wait()); | |
842 | ||
843 | ASSERT_EQ(0, metadata->set_active_set(0)); | |
844 | ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 0)); | |
845 | ASSERT_EQ(0, this->write_entry(oid, 1, tag1.tid, 1)); | |
846 | ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 2)); | |
847 | ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 4)); | |
848 | player->prefetch_and_watch(0.25); | |
849 | ||
850 | Entries entries; | |
851 | ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); | |
852 | ||
853 | Entries expected_entries = { | |
854 | this->create_entry(tag1.tid, 0), | |
855 | this->create_entry(tag1.tid, 1), | |
856 | this->create_entry(tag1.tid, 2)}; | |
857 | ASSERT_EQ(expected_entries, entries); | |
858 | ||
859 | journal::Entry entry; | |
860 | uint64_t commit_tid; | |
861 | ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid)); | |
862 | ||
863 | C_SaferCond ctx2; | |
864 | cls::journal::Tag tag2; | |
865 | metadata->allocate_tag(tag1.tag_class, {}, &tag2, &ctx2); | |
866 | ASSERT_EQ(0, ctx2.wait()); | |
867 | ||
868 | ASSERT_EQ(0, this->write_entry(oid, 0, tag2.tid, 0)); | |
869 | ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); | |
870 | ||
871 | expected_entries = { | |
872 | this->create_entry(tag2.tid, 0)}; | |
873 | ASSERT_EQ(expected_entries, entries); | |
874 | } | |
875 | ||
876 | TYPED_TEST(TestJournalPlayer, LiveReplayStaleEntries) { | |
877 | std::string oid = this->get_temp_oid(); | |
878 | ||
879 | journal::JournalPlayer::ObjectPositions positions = { | |
880 | cls::journal::ObjectPosition(0, 1, 0) }; | |
881 | cls::journal::ObjectSetPosition commit_position(positions); | |
882 | ||
883 | ASSERT_EQ(0, this->create(oid)); | |
884 | ASSERT_EQ(0, this->client_register(oid)); | |
885 | ASSERT_EQ(0, this->client_commit(oid, commit_position)); | |
886 | ||
9f95a23c | 887 | auto metadata = this->create_metadata(oid); |
7c673cae FG |
888 | ASSERT_EQ(0, this->init_metadata(metadata)); |
889 | ||
890 | journal::JournalPlayer *player = this->create_player(oid, metadata); | |
891 | BOOST_SCOPE_EXIT_ALL( (player) ) { | |
892 | C_SaferCond unwatch_ctx; | |
893 | player->shut_down(&unwatch_ctx); | |
894 | ASSERT_EQ(0, unwatch_ctx.wait()); | |
895 | }; | |
896 | ||
897 | ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1)); | |
898 | ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3)); | |
899 | ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0)); | |
900 | ASSERT_EQ(0, this->write_entry(oid, 1, 1, 1)); | |
901 | player->prefetch_and_watch(0.25); | |
902 | ||
903 | Entries entries; | |
904 | ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); | |
905 | ||
906 | Entries expected_entries = { | |
907 | this->create_entry(1, 1)}; | |
908 | ASSERT_EQ(expected_entries, entries); | |
909 | } | |
910 | ||
911 | TYPED_TEST(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) { | |
912 | std::string oid = this->get_temp_oid(); | |
913 | ||
914 | journal::JournalPlayer::ObjectPositions positions = { | |
915 | cls::journal::ObjectPosition(1, 0, 1), | |
916 | cls::journal::ObjectPosition(0, 0, 0)}; | |
917 | cls::journal::ObjectSetPosition commit_position(positions); | |
918 | ||
919 | ASSERT_EQ(0, this->create(oid)); | |
920 | ASSERT_EQ(0, this->client_register(oid)); | |
921 | ASSERT_EQ(0, this->client_commit(oid, commit_position)); | |
922 | ||
9f95a23c | 923 | auto metadata = this->create_metadata(oid); |
7c673cae FG |
924 | ASSERT_EQ(0, this->init_metadata(metadata)); |
925 | ||
926 | journal::JournalPlayer *player = this->create_player(oid, metadata); | |
927 | BOOST_SCOPE_EXIT_ALL( (player) ) { | |
928 | C_SaferCond unwatch_ctx; | |
929 | player->shut_down(&unwatch_ctx); | |
930 | ASSERT_EQ(0, unwatch_ctx.wait()); | |
931 | }; | |
932 | ||
933 | ASSERT_EQ(0, metadata->set_active_set(1)); | |
934 | ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0)); | |
935 | ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1)); | |
936 | ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3)); | |
937 | ASSERT_EQ(0, this->write_entry(oid, 2, 1, 0)); | |
938 | player->prefetch_and_watch(0.25); | |
939 | ||
940 | Entries entries; | |
941 | ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); | |
942 | ||
943 | Entries expected_entries = { | |
944 | this->create_entry(1, 0)}; | |
945 | ASSERT_EQ(expected_entries, entries); | |
946 | ||
947 | // should remove player for offset 3 after refetching | |
948 | ASSERT_EQ(0, metadata->set_active_set(3)); | |
949 | ASSERT_EQ(0, this->write_entry(oid, 7, 1, 1)); | |
950 | ||
951 | ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); | |
952 | ||
953 | expected_entries = { | |
954 | this->create_entry(1, 1)}; | |
955 | ASSERT_EQ(expected_entries, entries); | |
956 | } | |
957 | ||
958 | TYPED_TEST(TestJournalPlayer, PrefechShutDown) { | |
959 | std::string oid = this->get_temp_oid(); | |
960 | ||
961 | ASSERT_EQ(0, this->create(oid)); | |
962 | ASSERT_EQ(0, this->client_register(oid)); | |
963 | ASSERT_EQ(0, this->client_commit(oid, {})); | |
964 | ||
9f95a23c | 965 | auto metadata = this->create_metadata(oid); |
7c673cae FG |
966 | ASSERT_EQ(0, this->init_metadata(metadata)); |
967 | ||
968 | journal::JournalPlayer *player = this->create_player(oid, metadata); | |
969 | BOOST_SCOPE_EXIT_ALL( (player) ) { | |
970 | C_SaferCond unwatch_ctx; | |
971 | player->shut_down(&unwatch_ctx); | |
972 | ASSERT_EQ(0, unwatch_ctx.wait()); | |
973 | }; | |
974 | player->prefetch(); | |
975 | } | |
976 | ||
977 | TYPED_TEST(TestJournalPlayer, LiveReplayShutDown) { | |
978 | std::string oid = this->get_temp_oid(); | |
979 | ||
980 | ASSERT_EQ(0, this->create(oid)); | |
981 | ASSERT_EQ(0, this->client_register(oid)); | |
982 | ASSERT_EQ(0, this->client_commit(oid, {})); | |
983 | ||
9f95a23c | 984 | auto metadata = this->create_metadata(oid); |
7c673cae FG |
985 | ASSERT_EQ(0, this->init_metadata(metadata)); |
986 | ||
987 | journal::JournalPlayer *player = this->create_player(oid, metadata); | |
988 | BOOST_SCOPE_EXIT_ALL( (player) ) { | |
989 | C_SaferCond unwatch_ctx; | |
990 | player->shut_down(&unwatch_ctx); | |
991 | ASSERT_EQ(0, unwatch_ctx.wait()); | |
992 | }; | |
993 | player->prefetch_and_watch(0.25); | |
994 | } | |
995 |