]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/journal/test_JournalPlayer.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / test / journal / test_JournalPlayer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "journal/JournalPlayer.h"
5#include "journal/Entry.h"
6#include "journal/JournalMetadata.h"
7#include "journal/ReplayHandler.h"
8#include "include/stringify.h"
9f95a23c 9#include "common/ceph_mutex.h"
7c673cae
FG
10#include "gtest/gtest.h"
11#include "test/journal/RadosTestFixture.h"
12#include <list>
13#include <boost/scope_exit.hpp>
14
20effc67 15using namespace std::chrono_literals;
7c673cae
FG
16typedef std::list<journal::Entry> Entries;
17
18template <typename T>
19class TestJournalPlayer : public RadosTestFixture {
20public:
21 typedef std::list<journal::JournalPlayer *> JournalPlayers;
22
23 static const uint64_t max_fetch_bytes = T::max_fetch_bytes;
24
25 struct ReplayHandler : public journal::ReplayHandler {
9f95a23c
TL
26 ceph::mutex lock = ceph::make_mutex("lock");
27 ceph::condition_variable cond;
7c673cae
FG
28 bool entries_available;
29 bool complete;
30 int complete_result;
31
32 ReplayHandler()
9f95a23c 33 : entries_available(false), complete(false),
7c673cae
FG
34 complete_result(0) {}
35
7c673cae 36 void handle_entries_available() override {
9f95a23c 37 std::lock_guard locker{lock};
7c673cae 38 entries_available = true;
9f95a23c 39 cond.notify_all();
7c673cae
FG
40 }
41
42 void handle_complete(int r) override {
9f95a23c 43 std::lock_guard locker{lock};
7c673cae
FG
44 complete = true;
45 complete_result = r;
9f95a23c 46 cond.notify_all();
7c673cae
FG
47 }
48 };
49
50 void TearDown() override {
51 for (JournalPlayers::iterator it = m_players.begin();
52 it != m_players.end(); ++it) {
53 delete *it;
54 }
55 RadosTestFixture::TearDown();
56 }
57
9f95a23c 58 auto create_metadata(const std::string &oid) {
7c673cae
FG
59 return RadosTestFixture::create_metadata(oid, "client", 0.1,
60 max_fetch_bytes);
61 }
62
63 int client_commit(const std::string &oid,
64 journal::JournalPlayer::ObjectSetPosition position) {
65 return RadosTestFixture::client_commit(oid, "client", position);
66 }
67
68 journal::Entry create_entry(uint64_t tag_tid, uint64_t entry_tid) {
69 std::string payload(128, '0');
70 bufferlist payload_bl;
71 payload_bl.append(payload);
72 return journal::Entry(tag_tid, entry_tid, payload_bl);
73 }
74
75 journal::JournalPlayer *create_player(const std::string &oid,
9f95a23c 76 const ceph::ref_t<journal::JournalMetadata>& metadata) {
7c673cae 77 journal::JournalPlayer *player(new journal::JournalPlayer(
9f95a23c 78 m_ioctx, oid + ".", metadata, &m_replay_hander, nullptr));
7c673cae
FG
79 m_players.push_back(player);
80 return player;
81 }
82
83 bool wait_for_entries(journal::JournalPlayer *player, uint32_t count,
84 Entries *entries) {
85 entries->clear();
86 while (entries->size() < count) {
87 journal::Entry entry;
88 uint64_t commit_tid;
89 while (entries->size() < count &&
90 player->try_pop_front(&entry, &commit_tid)) {
91 entries->push_back(entry);
92 }
93 if (entries->size() == count) {
94 break;
95 }
96
9f95a23c 97 std::unique_lock locker{m_replay_hander.lock};
7c673cae
FG
98 if (m_replay_hander.entries_available) {
99 m_replay_hander.entries_available = false;
9f95a23c
TL
100 } else if (m_replay_hander.cond.wait_for(locker, 10s) ==
101 std::cv_status::timeout) {
7c673cae
FG
102 break;
103 }
104 }
105 return entries->size() == count;
106 }
107
108 bool wait_for_complete(journal::JournalPlayer *player) {
9f95a23c 109 std::unique_lock locker{m_replay_hander.lock};
7c673cae
FG
110 while (!m_replay_hander.complete) {
111 journal::Entry entry;
112 uint64_t commit_tid;
113 player->try_pop_front(&entry, &commit_tid);
114
9f95a23c
TL
115 if (m_replay_hander.cond.wait_for(locker, 10s) ==
116 std::cv_status::timeout) {
7c673cae
FG
117 return false;
118 }
119 }
120 m_replay_hander.complete = false;
121 return true;
122 }
123
124 int write_entry(const std::string &oid, uint64_t object_num,
125 uint64_t tag_tid, uint64_t entry_tid) {
126 bufferlist bl;
11fdf7f2 127 encode(create_entry(tag_tid, entry_tid), bl);
7c673cae
FG
128 return append(oid + "." + stringify(object_num), bl);
129 }
130
131 JournalPlayers m_players;
132 ReplayHandler m_replay_hander;
133};
134
135template <uint64_t _max_fetch_bytes>
136class TestJournalPlayerParams {
137public:
138 static const uint64_t max_fetch_bytes = _max_fetch_bytes;
139};
140
141typedef ::testing::Types<TestJournalPlayerParams<0>,
142 TestJournalPlayerParams<16> > TestJournalPlayerTypes;
9f95a23c 143TYPED_TEST_SUITE(TestJournalPlayer, TestJournalPlayerTypes);
7c673cae
FG
144
145TYPED_TEST(TestJournalPlayer, Prefetch) {
146 std::string oid = this->get_temp_oid();
147
148 journal::JournalPlayer::ObjectPositions positions;
149 positions = {
150 cls::journal::ObjectPosition(0, 234, 122) };
151 cls::journal::ObjectSetPosition commit_position(positions);
152
153 ASSERT_EQ(0, this->create(oid));
154 ASSERT_EQ(0, this->client_register(oid));
155 ASSERT_EQ(0, this->client_commit(oid, commit_position));
156
9f95a23c 157 auto metadata = this->create_metadata(oid);
7c673cae
FG
158 ASSERT_EQ(0, this->init_metadata(metadata));
159
160 journal::JournalPlayer *player = this->create_player(oid, metadata);
161 BOOST_SCOPE_EXIT_ALL( (player) ) {
162 C_SaferCond unwatch_ctx;
163 player->shut_down(&unwatch_ctx);
164 ASSERT_EQ(0, unwatch_ctx.wait());
165 };
166
167 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
168 ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
169 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
170 ASSERT_EQ(0, this->write_entry(oid, 1, 234, 125));
171
172 player->prefetch();
173
174 Entries entries;
175 ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
176 ASSERT_TRUE(this->wait_for_complete(player));
177
178 Entries expected_entries;
179 expected_entries = {
180 this->create_entry(234, 123),
181 this->create_entry(234, 124),
182 this->create_entry(234, 125)};
183 ASSERT_EQ(expected_entries, entries);
184
185 uint64_t last_tid;
186 ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
187 ASSERT_EQ(125U, last_tid);
188}
189
190TYPED_TEST(TestJournalPlayer, PrefetchSkip) {
191 std::string oid = this->get_temp_oid();
192
193 journal::JournalPlayer::ObjectPositions positions;
194 positions = {
195 cls::journal::ObjectPosition(0, 234, 125),
196 cls::journal::ObjectPosition(1, 234, 124) };
197 cls::journal::ObjectSetPosition commit_position(positions);
198
199 ASSERT_EQ(0, this->create(oid));
200 ASSERT_EQ(0, this->client_register(oid));
201 ASSERT_EQ(0, this->client_commit(oid, commit_position));
202
9f95a23c 203 auto metadata = this->create_metadata(oid);
7c673cae
FG
204 ASSERT_EQ(0, this->init_metadata(metadata));
205
206 journal::JournalPlayer *player = this->create_player(oid, metadata);
207 BOOST_SCOPE_EXIT_ALL( (player) ) {
208 C_SaferCond unwatch_ctx;
209 player->shut_down(&unwatch_ctx);
210 ASSERT_EQ(0, unwatch_ctx.wait());
211 };
212
213 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
214 ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
215 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
216 ASSERT_EQ(0, this->write_entry(oid, 1, 234, 125));
217
218 player->prefetch();
219
220 Entries entries;
221 ASSERT_TRUE(this->wait_for_entries(player, 0, &entries));
222 ASSERT_TRUE(this->wait_for_complete(player));
223
224 uint64_t last_tid;
225 ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
226 ASSERT_EQ(125U, last_tid);
227}
228
229TYPED_TEST(TestJournalPlayer, PrefetchWithoutCommit) {
230 std::string oid = this->get_temp_oid();
231
232 cls::journal::ObjectSetPosition commit_position;
233
234 ASSERT_EQ(0, this->create(oid));
235 ASSERT_EQ(0, this->client_register(oid));
236 ASSERT_EQ(0, this->client_commit(oid, commit_position));
237
9f95a23c 238 auto metadata = this->create_metadata(oid);
7c673cae
FG
239 ASSERT_EQ(0, this->init_metadata(metadata));
240
241 journal::JournalPlayer *player = this->create_player(oid, metadata);
242 BOOST_SCOPE_EXIT_ALL( (player) ) {
243 C_SaferCond unwatch_ctx;
244 player->shut_down(&unwatch_ctx);
245 ASSERT_EQ(0, unwatch_ctx.wait());
246 };
247
248 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
249 ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
250
251 player->prefetch();
252
253 Entries entries;
254 ASSERT_TRUE(this->wait_for_entries(player, 2, &entries));
255 ASSERT_TRUE(this->wait_for_complete(player));
256
257 Entries expected_entries;
258 expected_entries = {
259 this->create_entry(234, 122),
260 this->create_entry(234, 123)};
261 ASSERT_EQ(expected_entries, entries);
262}
263
264TYPED_TEST(TestJournalPlayer, PrefetchMultipleTags) {
265 std::string oid = this->get_temp_oid();
266
267 journal::JournalPlayer::ObjectPositions positions;
268 positions = {
269 cls::journal::ObjectPosition(2, 234, 122),
270 cls::journal::ObjectPosition(1, 234, 121),
271 cls::journal::ObjectPosition(0, 234, 120)};
272 cls::journal::ObjectSetPosition commit_position(positions);
273
274 ASSERT_EQ(0, this->create(oid, 14, 3));
275 ASSERT_EQ(0, this->client_register(oid));
276 ASSERT_EQ(0, this->client_commit(oid, commit_position));
277
9f95a23c 278 auto metadata = this->create_metadata(oid);
7c673cae
FG
279 ASSERT_EQ(0, this->init_metadata(metadata));
280
281 journal::JournalPlayer *player = this->create_player(oid, metadata);
282 BOOST_SCOPE_EXIT_ALL( (player) ) {
283 C_SaferCond unwatch_ctx;
284 player->shut_down(&unwatch_ctx);
285 ASSERT_EQ(0, unwatch_ctx.wait());
286 };
287
288 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120));
289 ASSERT_EQ(0, this->write_entry(oid, 1, 234, 121));
290 ASSERT_EQ(0, this->write_entry(oid, 2, 234, 122));
291 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 123));
292 ASSERT_EQ(0, this->write_entry(oid, 1, 234, 124));
293 ASSERT_EQ(0, this->write_entry(oid, 0, 236, 0)); // new tag allocated
294
295 player->prefetch();
296
297 Entries entries;
298 ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
299 ASSERT_TRUE(this->wait_for_complete(player));
300
301 uint64_t last_tid;
302 ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
303 ASSERT_EQ(124U, last_tid);
304 ASSERT_TRUE(metadata->get_last_allocated_entry_tid(236, &last_tid));
305 ASSERT_EQ(0U, last_tid);
306}
307
308TYPED_TEST(TestJournalPlayer, PrefetchCorruptSequence) {
309 std::string oid = this->get_temp_oid();
310
311 cls::journal::ObjectSetPosition commit_position;
312
313 ASSERT_EQ(0, this->create(oid));
314 ASSERT_EQ(0, this->client_register(oid));
315 ASSERT_EQ(0, this->client_commit(oid, commit_position));
316
9f95a23c 317 auto metadata = this->create_metadata(oid);
7c673cae
FG
318 ASSERT_EQ(0, this->init_metadata(metadata));
319
320 journal::JournalPlayer *player = this->create_player(oid, metadata);
321 BOOST_SCOPE_EXIT_ALL( (player) ) {
322 C_SaferCond unwatch_ctx;
323 player->shut_down(&unwatch_ctx);
324 ASSERT_EQ(0, unwatch_ctx.wait());
325 };
326
327 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120));
328 ASSERT_EQ(0, this->write_entry(oid, 1, 234, 121));
329 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
330
331 player->prefetch();
332 Entries entries;
333 ASSERT_TRUE(this->wait_for_entries(player, 2, &entries));
334
335 journal::Entry entry;
336 uint64_t commit_tid;
337 ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
338 ASSERT_TRUE(this->wait_for_complete(player));
339 ASSERT_EQ(-ENOMSG, this->m_replay_hander.complete_result);
340}
341
342TYPED_TEST(TestJournalPlayer, PrefetchMissingSequence) {
343 std::string oid = this->get_temp_oid();
344
345 cls::journal::ObjectSetPosition commit_position;
346
347 ASSERT_EQ(0, this->create(oid, 14, 4));
348 ASSERT_EQ(0, this->client_register(oid));
349 ASSERT_EQ(0, this->client_commit(oid, commit_position));
350
9f95a23c 351 auto metadata = this->create_metadata(oid);
7c673cae
FG
352 ASSERT_EQ(0, this->init_metadata(metadata));
353
354 journal::JournalPlayer *player = this->create_player(oid, metadata);
355 BOOST_SCOPE_EXIT_ALL( (player) ) {
356 C_SaferCond unwatch_ctx;
357 player->shut_down(&unwatch_ctx);
358 ASSERT_EQ(0, unwatch_ctx.wait());
359 };
360
361 ASSERT_EQ(0, metadata->set_active_set(1));
362 ASSERT_EQ(0, this->write_entry(oid, 0, 2, 852));
363 ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856));
364 ASSERT_EQ(0, this->write_entry(oid, 0, 2, 860));
365 ASSERT_EQ(0, this->write_entry(oid, 1, 2, 853));
366 ASSERT_EQ(0, this->write_entry(oid, 1, 2, 857));
367 ASSERT_EQ(0, this->write_entry(oid, 5, 2, 861));
368 ASSERT_EQ(0, this->write_entry(oid, 2, 2, 854));
369 ASSERT_EQ(0, this->write_entry(oid, 0, 3, 0));
370 ASSERT_EQ(0, this->write_entry(oid, 5, 3, 1));
371 ASSERT_EQ(0, this->write_entry(oid, 2, 3, 2));
372 ASSERT_EQ(0, this->write_entry(oid, 3, 3, 3));
373
374 player->prefetch();
375 Entries entries;
376 ASSERT_TRUE(this->wait_for_entries(player, 7, &entries));
377
378 Entries expected_entries = {
379 this->create_entry(2, 852),
380 this->create_entry(2, 853),
381 this->create_entry(2, 854),
382 this->create_entry(3, 0),
383 this->create_entry(3, 1),
384 this->create_entry(3, 2),
385 this->create_entry(3, 3)};
386 ASSERT_EQ(expected_entries, entries);
387
388 ASSERT_TRUE(this->wait_for_complete(player));
389 ASSERT_EQ(0, this->m_replay_hander.complete_result);
390}
391
392TYPED_TEST(TestJournalPlayer, PrefetchLargeMissingSequence) {
393 std::string oid = this->get_temp_oid();
394
395 cls::journal::ObjectSetPosition commit_position;
396
397 ASSERT_EQ(0, this->create(oid));
398 ASSERT_EQ(0, this->client_register(oid));
399 ASSERT_EQ(0, this->client_commit(oid, commit_position));
400
9f95a23c 401 auto metadata = this->create_metadata(oid);
7c673cae
FG
402 ASSERT_EQ(0, this->init_metadata(metadata));
403
404 journal::JournalPlayer *player = this->create_player(oid, metadata);
405 BOOST_SCOPE_EXIT_ALL( (player) ) {
406 C_SaferCond unwatch_ctx;
407 player->shut_down(&unwatch_ctx);
408 ASSERT_EQ(0, unwatch_ctx.wait());
409 };
410
411 ASSERT_EQ(0, metadata->set_active_set(2));
412 ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
413 ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
414 ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3));
415 ASSERT_EQ(0, this->write_entry(oid, 4, 1, 0));
416
417 player->prefetch();
418 Entries entries;
419 ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
420
421 Entries expected_entries = {
422 this->create_entry(0, 0),
423 this->create_entry(0, 1),
424 this->create_entry(1, 0)};
425 ASSERT_EQ(expected_entries, entries);
426}
427
428TYPED_TEST(TestJournalPlayer, PrefetchBlockedNewTag) {
429 std::string oid = this->get_temp_oid();
430
431 cls::journal::ObjectSetPosition commit_position;
432
433 ASSERT_EQ(0, this->create(oid));
434 ASSERT_EQ(0, this->client_register(oid));
435 ASSERT_EQ(0, this->client_commit(oid, commit_position));
436
9f95a23c 437 auto metadata = this->create_metadata(oid);
7c673cae
FG
438 ASSERT_EQ(0, this->init_metadata(metadata));
439
440 journal::JournalPlayer *player = this->create_player(oid, metadata);
441 BOOST_SCOPE_EXIT_ALL( (player) ) {
442 C_SaferCond unwatch_ctx;
443 player->shut_down(&unwatch_ctx);
444 ASSERT_EQ(0, unwatch_ctx.wait());
445 };
446
447 ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
448 ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
449 ASSERT_EQ(0, this->write_entry(oid, 0, 0, 2));
450 ASSERT_EQ(0, this->write_entry(oid, 0, 0, 4));
451 ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0));
452
453 player->prefetch();
454 Entries entries;
455 ASSERT_TRUE(this->wait_for_entries(player, 4, &entries));
456
457 Entries expected_entries = {
458 this->create_entry(0, 0),
459 this->create_entry(0, 1),
460 this->create_entry(0, 2),
461 this->create_entry(1, 0)};
462 ASSERT_EQ(expected_entries, entries);
463}
464
465TYPED_TEST(TestJournalPlayer, PrefetchStaleEntries) {
466 std::string oid = this->get_temp_oid();
467
468 journal::JournalPlayer::ObjectPositions positions = {
469 cls::journal::ObjectPosition(0, 1, 0) };
470 cls::journal::ObjectSetPosition commit_position(positions);
471
472 ASSERT_EQ(0, this->create(oid));
473 ASSERT_EQ(0, this->client_register(oid));
474 ASSERT_EQ(0, this->client_commit(oid, commit_position));
475
9f95a23c 476 auto metadata = this->create_metadata(oid);
7c673cae
FG
477 ASSERT_EQ(0, this->init_metadata(metadata));
478
479 journal::JournalPlayer *player = this->create_player(oid, metadata);
480 BOOST_SCOPE_EXIT_ALL( (player) ) {
481 C_SaferCond unwatch_ctx;
482 player->shut_down(&unwatch_ctx);
483 ASSERT_EQ(0, unwatch_ctx.wait());
484 };
485
486 ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
487 ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3));
488 ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0));
489 ASSERT_EQ(0, this->write_entry(oid, 1, 1, 1));
490
491 player->prefetch();
492 Entries entries;
493 ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
494
495 Entries expected_entries = {
496 this->create_entry(1, 1)};
497 ASSERT_EQ(expected_entries, entries);
498
499 ASSERT_TRUE(this->wait_for_complete(player));
500 ASSERT_EQ(0, this->m_replay_hander.complete_result);
501}
502
503TYPED_TEST(TestJournalPlayer, PrefetchUnexpectedTag) {
504 std::string oid = this->get_temp_oid();
505
506 cls::journal::ObjectSetPosition commit_position;
507
508 ASSERT_EQ(0, this->create(oid));
509 ASSERT_EQ(0, this->client_register(oid));
510 ASSERT_EQ(0, this->client_commit(oid, commit_position));
511
9f95a23c 512 auto metadata = this->create_metadata(oid);
7c673cae
FG
513 ASSERT_EQ(0, this->init_metadata(metadata));
514
515 journal::JournalPlayer *player = this->create_player(oid, metadata);
516 BOOST_SCOPE_EXIT_ALL( (player) ) {
517 C_SaferCond unwatch_ctx;
518 player->shut_down(&unwatch_ctx);
519 ASSERT_EQ(0, unwatch_ctx.wait());
520 };
521
522 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120));
523 ASSERT_EQ(0, this->write_entry(oid, 1, 235, 121));
524 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
525
526 player->prefetch();
527 Entries entries;
528 ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
529
530 journal::Entry entry;
531 uint64_t commit_tid;
532 ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
533 ASSERT_TRUE(this->wait_for_complete(player));
534 ASSERT_EQ(0, this->m_replay_hander.complete_result);
535}
536
537TYPED_TEST(TestJournalPlayer, PrefetchAndWatch) {
538 std::string oid = this->get_temp_oid();
539
540 journal::JournalPlayer::ObjectPositions positions;
541 positions = {
542 cls::journal::ObjectPosition(0, 234, 122)};
543 cls::journal::ObjectSetPosition commit_position(positions);
544
545 ASSERT_EQ(0, this->create(oid));
546 ASSERT_EQ(0, this->client_register(oid));
547 ASSERT_EQ(0, this->client_commit(oid, commit_position));
548
9f95a23c 549 auto metadata = this->create_metadata(oid);
7c673cae
FG
550 ASSERT_EQ(0, this->init_metadata(metadata));
551
552 journal::JournalPlayer *player = this->create_player(oid, metadata);
553 BOOST_SCOPE_EXIT_ALL( (player) ) {
554 C_SaferCond unwatch_ctx;
555 player->shut_down(&unwatch_ctx);
556 ASSERT_EQ(0, unwatch_ctx.wait());
557 };
558
559 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
560
561 player->prefetch_and_watch(0.25);
562
563 Entries entries;
564 ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
565 ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
566
567 Entries expected_entries;
568 expected_entries = {this->create_entry(234, 123)};
569 ASSERT_EQ(expected_entries, entries);
570
571 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
572 ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
573
574 expected_entries = {this->create_entry(234, 124)};
575 ASSERT_EQ(expected_entries, entries);
576}
577
578TYPED_TEST(TestJournalPlayer, PrefetchSkippedObject) {
579 std::string oid = this->get_temp_oid();
580
581 cls::journal::ObjectSetPosition commit_position;
582
583 ASSERT_EQ(0, this->create(oid, 14, 3));
584 ASSERT_EQ(0, this->client_register(oid));
585 ASSERT_EQ(0, this->client_commit(oid, commit_position));
586
9f95a23c 587 auto metadata = this->create_metadata(oid);
7c673cae
FG
588 ASSERT_EQ(0, this->init_metadata(metadata));
589 ASSERT_EQ(0, metadata->set_active_set(2));
590
591 journal::JournalPlayer *player = this->create_player(oid, metadata);
592 BOOST_SCOPE_EXIT_ALL( (player) ) {
593 C_SaferCond unwatch_ctx;
594 player->shut_down(&unwatch_ctx);
595 ASSERT_EQ(0, unwatch_ctx.wait());
596 };
597
598 ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
599 ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
600 ASSERT_EQ(0, this->write_entry(oid, 5, 234, 124));
601 ASSERT_EQ(0, this->write_entry(oid, 6, 234, 125));
602 ASSERT_EQ(0, this->write_entry(oid, 7, 234, 126));
603
604 player->prefetch();
605
606 Entries entries;
607 ASSERT_TRUE(this->wait_for_entries(player, 5, &entries));
608 ASSERT_TRUE(this->wait_for_complete(player));
609
610 Entries expected_entries;
611 expected_entries = {
612 this->create_entry(234, 122),
613 this->create_entry(234, 123),
614 this->create_entry(234, 124),
615 this->create_entry(234, 125),
616 this->create_entry(234, 126)};
617 ASSERT_EQ(expected_entries, entries);
618
619 uint64_t last_tid;
620 ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
621 ASSERT_EQ(126U, last_tid);
622}
623
624TYPED_TEST(TestJournalPlayer, ImbalancedJournal) {
625 std::string oid = this->get_temp_oid();
626
627 journal::JournalPlayer::ObjectPositions positions = {
628 cls::journal::ObjectPosition(9, 300, 1),
629 cls::journal::ObjectPosition(8, 300, 0),
630 cls::journal::ObjectPosition(10, 200, 4334),
631 cls::journal::ObjectPosition(11, 200, 4331) };
632 cls::journal::ObjectSetPosition commit_position(positions);
633
634 ASSERT_EQ(0, this->create(oid, 14, 4));
635 ASSERT_EQ(0, this->client_register(oid));
636 ASSERT_EQ(0, this->client_commit(oid, commit_position));
637
9f95a23c 638 auto metadata = this->create_metadata(oid);
7c673cae
FG
639 ASSERT_EQ(0, this->init_metadata(metadata));
640 ASSERT_EQ(0, metadata->set_active_set(2));
641 metadata->set_minimum_set(2);
642
643 journal::JournalPlayer *player = this->create_player(oid, metadata);
644 BOOST_SCOPE_EXIT_ALL( (player) ) {
645 C_SaferCond unwatch_ctx;
646 player->shut_down(&unwatch_ctx);
647 ASSERT_EQ(0, unwatch_ctx.wait());
648 };
649
650 ASSERT_EQ(0, this->write_entry(oid, 8, 300, 0));
651 ASSERT_EQ(0, this->write_entry(oid, 8, 301, 0));
652 ASSERT_EQ(0, this->write_entry(oid, 9, 300, 1));
653 ASSERT_EQ(0, this->write_entry(oid, 9, 301, 1));
654 ASSERT_EQ(0, this->write_entry(oid, 10, 200, 4334));
655 ASSERT_EQ(0, this->write_entry(oid, 10, 301, 2));
656 ASSERT_EQ(0, this->write_entry(oid, 11, 200, 4331));
657 ASSERT_EQ(0, this->write_entry(oid, 11, 301, 3));
658
659 player->prefetch();
660
661 Entries entries;
662 ASSERT_TRUE(this->wait_for_entries(player, 4, &entries));
663 ASSERT_TRUE(this->wait_for_complete(player));
664
665 Entries expected_entries;
666 expected_entries = {
667 this->create_entry(301, 0),
668 this->create_entry(301, 1),
669 this->create_entry(301, 2),
670 this->create_entry(301, 3)};
671 ASSERT_EQ(expected_entries, entries);
672
673 uint64_t last_tid;
674 ASSERT_TRUE(metadata->get_last_allocated_entry_tid(301, &last_tid));
675 ASSERT_EQ(3U, last_tid);
676}
677
678TYPED_TEST(TestJournalPlayer, LiveReplayLaggyAppend) {
679 std::string oid = this->get_temp_oid();
680
681 cls::journal::ObjectSetPosition commit_position;
682
683 ASSERT_EQ(0, this->create(oid));
684 ASSERT_EQ(0, this->client_register(oid));
685 ASSERT_EQ(0, this->client_commit(oid, commit_position));
686
9f95a23c 687 auto metadata = this->create_metadata(oid);
7c673cae
FG
688 ASSERT_EQ(0, this->init_metadata(metadata));
689
690 journal::JournalPlayer *player = this->create_player(oid, metadata);
691 BOOST_SCOPE_EXIT_ALL( (player) ) {
692 C_SaferCond unwatch_ctx;
693 player->shut_down(&unwatch_ctx);
694 ASSERT_EQ(0, unwatch_ctx.wait());
695 };
696
697 ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
698 ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
699 ASSERT_EQ(0, this->write_entry(oid, 0, 0, 2));
700 ASSERT_EQ(0, this->write_entry(oid, 0, 0, 4));
701 ASSERT_EQ(0, this->write_entry(oid, 3, 0, 5)); // laggy entry 0/3 in object 1
702 player->prefetch_and_watch(0.25);
703
704 Entries entries;
705 ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
706
707 Entries expected_entries = {
708 this->create_entry(0, 0),
709 this->create_entry(0, 1),
710 this->create_entry(0, 2)};
711 ASSERT_EQ(expected_entries, entries);
712
713 journal::Entry entry;
714 uint64_t commit_tid;
715 ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
716
717 ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3));
718 ASSERT_EQ(0, metadata->set_active_set(1));
719 ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
720
721 expected_entries = {
722 this->create_entry(0, 3),
723 this->create_entry(0, 4),
724 this->create_entry(0, 5)};
725 ASSERT_EQ(expected_entries, entries);
726}
727
728TYPED_TEST(TestJournalPlayer, LiveReplayMissingSequence) {
729 std::string oid = this->get_temp_oid();
730
731 cls::journal::ObjectSetPosition commit_position;
732
733 ASSERT_EQ(0, this->create(oid, 14, 4));
734 ASSERT_EQ(0, this->client_register(oid));
735 ASSERT_EQ(0, this->client_commit(oid, commit_position));
736
9f95a23c 737 auto metadata = this->create_metadata(oid);
7c673cae
FG
738 ASSERT_EQ(0, this->init_metadata(metadata));
739
740 journal::JournalPlayer *player = this->create_player(oid, metadata);
741 BOOST_SCOPE_EXIT_ALL( (player) ) {
742 C_SaferCond unwatch_ctx;
743 player->shut_down(&unwatch_ctx);
744 ASSERT_EQ(0, unwatch_ctx.wait());
745 };
746
747 ASSERT_EQ(0, this->write_entry(oid, 0, 2, 852));
748 ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856));
749 ASSERT_EQ(0, this->write_entry(oid, 0, 2, 860));
750 ASSERT_EQ(0, this->write_entry(oid, 1, 2, 853));
751 ASSERT_EQ(0, this->write_entry(oid, 1, 2, 857));
752 ASSERT_EQ(0, this->write_entry(oid, 2, 2, 854));
753 ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856));
754 player->prefetch_and_watch(0.25);
755
756 Entries entries;
757 ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
758
759 Entries expected_entries = {
760 this->create_entry(2, 852),
761 this->create_entry(2, 853),
762 this->create_entry(2, 854)};
763 ASSERT_EQ(expected_entries, entries);
764
765 journal::Entry entry;
766 uint64_t commit_tid;
767 ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
768
769 ASSERT_EQ(0, this->write_entry(oid, 3, 3, 3));
770 ASSERT_EQ(0, this->write_entry(oid, 2, 3, 2));
771 ASSERT_EQ(0, this->write_entry(oid, 1, 3, 1));
772 ASSERT_EQ(0, this->write_entry(oid, 0, 3, 0));
773 ASSERT_TRUE(this->wait_for_entries(player, 4, &entries));
774
775 expected_entries = {
776 this->create_entry(3, 0),
777 this->create_entry(3, 1),
778 this->create_entry(3, 2),
779 this->create_entry(3, 3)};
780 ASSERT_EQ(expected_entries, entries);
781}
782
783TYPED_TEST(TestJournalPlayer, LiveReplayLargeMissingSequence) {
784 std::string oid = this->get_temp_oid();
785
786 cls::journal::ObjectSetPosition commit_position;
787
788 ASSERT_EQ(0, this->create(oid));
789 ASSERT_EQ(0, this->client_register(oid));
790 ASSERT_EQ(0, this->client_commit(oid, commit_position));
791
9f95a23c 792 auto metadata = this->create_metadata(oid);
7c673cae
FG
793 ASSERT_EQ(0, this->init_metadata(metadata));
794
795 journal::JournalPlayer *player = this->create_player(oid, metadata);
796 BOOST_SCOPE_EXIT_ALL( (player) ) {
797 C_SaferCond unwatch_ctx;
798 player->shut_down(&unwatch_ctx);
799 ASSERT_EQ(0, unwatch_ctx.wait());
800 };
801
802 ASSERT_EQ(0, metadata->set_active_set(2));
803 ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
804 ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
805 ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3));
806 ASSERT_EQ(0, this->write_entry(oid, 4, 1, 0));
807 player->prefetch_and_watch(0.25);
808
809 Entries entries;
810 ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
811
812 Entries expected_entries = {
813 this->create_entry(0, 0),
814 this->create_entry(0, 1),
815 this->create_entry(1, 0)};
816 ASSERT_EQ(expected_entries, entries);
817}
818
819TYPED_TEST(TestJournalPlayer, LiveReplayBlockedNewTag) {
820 std::string oid = this->get_temp_oid();
821
822 cls::journal::ObjectSetPosition commit_position;
823
824 ASSERT_EQ(0, this->create(oid));
825 ASSERT_EQ(0, this->client_register(oid));
826 ASSERT_EQ(0, this->client_commit(oid, commit_position));
827
9f95a23c 828 auto metadata = this->create_metadata(oid);
7c673cae
FG
829 ASSERT_EQ(0, this->init_metadata(metadata));
830
831 journal::JournalPlayer *player = this->create_player(oid, metadata);
832 BOOST_SCOPE_EXIT_ALL( (player) ) {
833 C_SaferCond unwatch_ctx;
834 player->shut_down(&unwatch_ctx);
835 ASSERT_EQ(0, unwatch_ctx.wait());
836 };
837
838 C_SaferCond ctx1;
839 cls::journal::Tag tag1;
840 metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, {}, &tag1, &ctx1);
841 ASSERT_EQ(0, ctx1.wait());
842
843 ASSERT_EQ(0, metadata->set_active_set(0));
844 ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 0));
845 ASSERT_EQ(0, this->write_entry(oid, 1, tag1.tid, 1));
846 ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 2));
847 ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 4));
848 player->prefetch_and_watch(0.25);
849
850 Entries entries;
851 ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
852
853 Entries expected_entries = {
854 this->create_entry(tag1.tid, 0),
855 this->create_entry(tag1.tid, 1),
856 this->create_entry(tag1.tid, 2)};
857 ASSERT_EQ(expected_entries, entries);
858
859 journal::Entry entry;
860 uint64_t commit_tid;
861 ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
862
863 C_SaferCond ctx2;
864 cls::journal::Tag tag2;
865 metadata->allocate_tag(tag1.tag_class, {}, &tag2, &ctx2);
866 ASSERT_EQ(0, ctx2.wait());
867
868 ASSERT_EQ(0, this->write_entry(oid, 0, tag2.tid, 0));
869 ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
870
871 expected_entries = {
872 this->create_entry(tag2.tid, 0)};
873 ASSERT_EQ(expected_entries, entries);
874}
875
876TYPED_TEST(TestJournalPlayer, LiveReplayStaleEntries) {
877 std::string oid = this->get_temp_oid();
878
879 journal::JournalPlayer::ObjectPositions positions = {
880 cls::journal::ObjectPosition(0, 1, 0) };
881 cls::journal::ObjectSetPosition commit_position(positions);
882
883 ASSERT_EQ(0, this->create(oid));
884 ASSERT_EQ(0, this->client_register(oid));
885 ASSERT_EQ(0, this->client_commit(oid, commit_position));
886
9f95a23c 887 auto metadata = this->create_metadata(oid);
7c673cae
FG
888 ASSERT_EQ(0, this->init_metadata(metadata));
889
890 journal::JournalPlayer *player = this->create_player(oid, metadata);
891 BOOST_SCOPE_EXIT_ALL( (player) ) {
892 C_SaferCond unwatch_ctx;
893 player->shut_down(&unwatch_ctx);
894 ASSERT_EQ(0, unwatch_ctx.wait());
895 };
896
897 ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
898 ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3));
899 ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0));
900 ASSERT_EQ(0, this->write_entry(oid, 1, 1, 1));
901 player->prefetch_and_watch(0.25);
902
903 Entries entries;
904 ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
905
906 Entries expected_entries = {
907 this->create_entry(1, 1)};
908 ASSERT_EQ(expected_entries, entries);
909}
910
911TYPED_TEST(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) {
912 std::string oid = this->get_temp_oid();
913
914 journal::JournalPlayer::ObjectPositions positions = {
915 cls::journal::ObjectPosition(1, 0, 1),
916 cls::journal::ObjectPosition(0, 0, 0)};
917 cls::journal::ObjectSetPosition commit_position(positions);
918
919 ASSERT_EQ(0, this->create(oid));
920 ASSERT_EQ(0, this->client_register(oid));
921 ASSERT_EQ(0, this->client_commit(oid, commit_position));
922
9f95a23c 923 auto metadata = this->create_metadata(oid);
7c673cae
FG
924 ASSERT_EQ(0, this->init_metadata(metadata));
925
926 journal::JournalPlayer *player = this->create_player(oid, metadata);
927 BOOST_SCOPE_EXIT_ALL( (player) ) {
928 C_SaferCond unwatch_ctx;
929 player->shut_down(&unwatch_ctx);
930 ASSERT_EQ(0, unwatch_ctx.wait());
931 };
932
933 ASSERT_EQ(0, metadata->set_active_set(1));
934 ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
935 ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
936 ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3));
937 ASSERT_EQ(0, this->write_entry(oid, 2, 1, 0));
938 player->prefetch_and_watch(0.25);
939
940 Entries entries;
941 ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
942
943 Entries expected_entries = {
944 this->create_entry(1, 0)};
945 ASSERT_EQ(expected_entries, entries);
946
947 // should remove player for offset 3 after refetching
948 ASSERT_EQ(0, metadata->set_active_set(3));
949 ASSERT_EQ(0, this->write_entry(oid, 7, 1, 1));
950
951 ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
952
953 expected_entries = {
954 this->create_entry(1, 1)};
955 ASSERT_EQ(expected_entries, entries);
956}
957
958TYPED_TEST(TestJournalPlayer, PrefechShutDown) {
959 std::string oid = this->get_temp_oid();
960
961 ASSERT_EQ(0, this->create(oid));
962 ASSERT_EQ(0, this->client_register(oid));
963 ASSERT_EQ(0, this->client_commit(oid, {}));
964
9f95a23c 965 auto metadata = this->create_metadata(oid);
7c673cae
FG
966 ASSERT_EQ(0, this->init_metadata(metadata));
967
968 journal::JournalPlayer *player = this->create_player(oid, metadata);
969 BOOST_SCOPE_EXIT_ALL( (player) ) {
970 C_SaferCond unwatch_ctx;
971 player->shut_down(&unwatch_ctx);
972 ASSERT_EQ(0, unwatch_ctx.wait());
973 };
974 player->prefetch();
975}
976
977TYPED_TEST(TestJournalPlayer, LiveReplayShutDown) {
978 std::string oid = this->get_temp_oid();
979
980 ASSERT_EQ(0, this->create(oid));
981 ASSERT_EQ(0, this->client_register(oid));
982 ASSERT_EQ(0, this->client_commit(oid, {}));
983
9f95a23c 984 auto metadata = this->create_metadata(oid);
7c673cae
FG
985 ASSERT_EQ(0, this->init_metadata(metadata));
986
987 journal::JournalPlayer *player = this->create_player(oid, metadata);
988 BOOST_SCOPE_EXIT_ALL( (player) ) {
989 C_SaferCond unwatch_ctx;
990 player->shut_down(&unwatch_ctx);
991 ASSERT_EQ(0, unwatch_ctx.wait());
992 };
993 player->prefetch_and_watch(0.25);
994}
995