]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | #include <iostream> | |
4 | #include <unistd.h> | |
5 | ||
6 | #include "gtest/gtest.h" | |
7 | #include "include/Context.h" | |
8 | #include "global/global_init.h" | |
9 | #include "global/global_context.h" | |
10 | ||
11 | #include "test/immutable_object_cache/test_common.h" | |
12 | #include "tools/immutable_object_cache/CacheClient.h" | |
13 | #include "tools/immutable_object_cache/CacheServer.h" | |
14 | ||
20effc67 | 15 | using namespace std; |
9f95a23c TL |
16 | using namespace ceph::immutable_obj_cache; |
17 | ||
18 | class TestMultiSession : public ::testing::Test { | |
19 | public: | |
20 | std::string m_local_path; | |
21 | CacheServer* m_cache_server; | |
22 | std::thread* m_cache_server_thread; | |
23 | std::vector<CacheClient*> m_cache_client_vec; | |
24 | WaitEvent m_wait_event; | |
25 | std::atomic<uint64_t> m_send_request_index; | |
26 | std::atomic<uint64_t> m_recv_ack_index; | |
27 | uint64_t m_session_num = 110; | |
28 | ||
29 | TestMultiSession() : m_local_path("/tmp/ceph_test_multisession_socket"), | |
30 | m_cache_server_thread(nullptr), m_send_request_index(0), | |
31 | m_recv_ack_index(0) { | |
32 | m_cache_client_vec.resize(m_session_num + 1, nullptr); | |
33 | } | |
34 | ||
35 | ~TestMultiSession() {} | |
36 | ||
37 | static void SetUpTestCase() {} | |
38 | static void TearDownTestCase() {} | |
39 | ||
40 | void SetUp() override { | |
41 | std::remove(m_local_path.c_str()); | |
42 | m_cache_server = new CacheServer(g_ceph_context, m_local_path, | |
43 | [this](CacheSession* session_id, ObjectCacheRequest* req){ | |
44 | server_handle_request(session_id, req); | |
45 | }); | |
46 | ASSERT_TRUE(m_cache_server != nullptr); | |
47 | ||
48 | m_cache_server_thread = new std::thread(([this]() { | |
49 | m_wait_event.signal(); | |
50 | m_cache_server->run(); | |
51 | })); | |
52 | ||
53 | // waiting for thread running. | |
54 | m_wait_event.wait(); | |
55 | ||
56 | // waiting for io_service run. | |
57 | usleep(2); | |
58 | } | |
59 | ||
60 | void TearDown() override { | |
61 | for (uint64_t i = 0; i < m_session_num; i++) { | |
62 | if (m_cache_client_vec[i] != nullptr) { | |
63 | m_cache_client_vec[i]->close(); | |
64 | delete m_cache_client_vec[i]; | |
65 | } | |
66 | } | |
67 | m_cache_server->stop(); | |
68 | if (m_cache_server_thread->joinable()) { | |
69 | m_cache_server_thread->join(); | |
70 | } | |
71 | delete m_cache_server; | |
72 | delete m_cache_server_thread; | |
73 | ||
74 | std::remove(m_local_path.c_str()); | |
75 | } | |
76 | ||
77 | CacheClient* create_session(uint64_t random_index) { | |
78 | CacheClient* cache_client = new CacheClient(m_local_path, g_ceph_context); | |
79 | cache_client->run(); | |
80 | while (true) { | |
81 | if (0 == cache_client->connect()) { | |
82 | break; | |
83 | } | |
84 | } | |
85 | m_cache_client_vec[random_index] = cache_client; | |
86 | return cache_client; | |
87 | } | |
88 | ||
89 | void server_handle_request(CacheSession* session_id, ObjectCacheRequest* req) { | |
90 | ||
91 | switch (req->get_request_type()) { | |
92 | case RBDSC_REGISTER: { | |
93 | ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY, | |
94 | req->seq); | |
95 | session_id->send(reply); | |
96 | break; | |
97 | } | |
98 | case RBDSC_READ: { | |
99 | ObjectCacheRequest* reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY, | |
100 | req->seq); | |
101 | session_id->send(reply); | |
102 | break; | |
103 | } | |
104 | } | |
105 | } | |
106 | ||
107 | void test_register_client(uint64_t random_index) { | |
108 | ASSERT_TRUE(m_cache_client_vec[random_index] == nullptr); | |
109 | ||
110 | auto ctx = new LambdaContext([](int ret){ | |
111 | ASSERT_TRUE(ret == 0); | |
112 | }); | |
113 | auto session = create_session(random_index); | |
114 | session->register_client(ctx); | |
115 | ||
116 | ASSERT_TRUE(m_cache_client_vec[random_index] != nullptr); | |
117 | ASSERT_TRUE(session->is_session_work()); | |
118 | } | |
119 | ||
120 | void test_lookup_object(std::string pool_nspace, uint64_t index, | |
121 | uint64_t request_num, bool is_last) { | |
122 | ||
123 | for (uint64_t i = 0; i < request_num; i++) { | |
124 | auto ctx = make_gen_lambda_context<ObjectCacheRequest*, | |
125 | std::function<void(ObjectCacheRequest*)>>([this](ObjectCacheRequest* ack) { | |
126 | m_recv_ack_index++; | |
127 | }); | |
128 | m_send_request_index++; | |
129 | // here just for concurrently testing register + lookup, so fix object id. | |
f67539c2 | 130 | m_cache_client_vec[index]->lookup_object(pool_nspace, 1, 2, 3, "1234", std::move(ctx)); |
9f95a23c TL |
131 | } |
132 | ||
133 | if (is_last) { | |
134 | while(m_send_request_index != m_recv_ack_index) { | |
135 | usleep(1); | |
136 | } | |
137 | m_wait_event.signal(); | |
138 | } | |
139 | } | |
140 | }; | |
141 | ||
142 | // test concurrent : multi-session + register_client + lookup_request | |
143 | TEST_F(TestMultiSession, test_multi_session) { | |
144 | ||
145 | uint64_t test_times = 1000; | |
146 | uint64_t test_session_num = 100; | |
147 | ||
148 | for (uint64_t i = 0; i <= test_times; i++) { | |
149 | uint64_t random_index = random() % test_session_num; | |
150 | if (m_cache_client_vec[random_index] == nullptr) { | |
151 | test_register_client(random_index); | |
152 | } else { | |
153 | test_lookup_object(string("test_nspace") + std::to_string(random_index), | |
154 | random_index, 4, i == test_times ? true : false); | |
155 | } | |
156 | } | |
157 | ||
158 | // make sure all ack will be received. | |
159 | m_wait_event.wait(); | |
160 | ||
161 | ASSERT_TRUE(m_send_request_index == m_recv_ack_index); | |
162 | } |