]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | #include <iostream> | |
4 | #include <unistd.h> | |
5 | ||
6 | #include "gtest/gtest.h" | |
7 | #include "include/Context.h" | |
8 | #include "global/global_init.h" | |
9 | #include "global/global_context.h" | |
10 | ||
11 | #include "test/immutable_object_cache/test_common.h" | |
12 | #include "tools/immutable_object_cache/CacheClient.h" | |
13 | #include "tools/immutable_object_cache/CacheServer.h" | |
14 | ||
15 | using namespace ceph::immutable_obj_cache; | |
16 | ||
17 | class TestMultiSession : public ::testing::Test { | |
18 | public: | |
19 | std::string m_local_path; | |
20 | CacheServer* m_cache_server; | |
21 | std::thread* m_cache_server_thread; | |
22 | std::vector<CacheClient*> m_cache_client_vec; | |
23 | WaitEvent m_wait_event; | |
24 | std::atomic<uint64_t> m_send_request_index; | |
25 | std::atomic<uint64_t> m_recv_ack_index; | |
26 | uint64_t m_session_num = 110; | |
27 | ||
28 | TestMultiSession() : m_local_path("/tmp/ceph_test_multisession_socket"), | |
29 | m_cache_server_thread(nullptr), m_send_request_index(0), | |
30 | m_recv_ack_index(0) { | |
31 | m_cache_client_vec.resize(m_session_num + 1, nullptr); | |
32 | } | |
33 | ||
34 | ~TestMultiSession() {} | |
35 | ||
36 | static void SetUpTestCase() {} | |
37 | static void TearDownTestCase() {} | |
38 | ||
39 | void SetUp() override { | |
40 | std::remove(m_local_path.c_str()); | |
41 | m_cache_server = new CacheServer(g_ceph_context, m_local_path, | |
42 | [this](CacheSession* session_id, ObjectCacheRequest* req){ | |
43 | server_handle_request(session_id, req); | |
44 | }); | |
45 | ASSERT_TRUE(m_cache_server != nullptr); | |
46 | ||
47 | m_cache_server_thread = new std::thread(([this]() { | |
48 | m_wait_event.signal(); | |
49 | m_cache_server->run(); | |
50 | })); | |
51 | ||
52 | // waiting for thread running. | |
53 | m_wait_event.wait(); | |
54 | ||
55 | // waiting for io_service run. | |
56 | usleep(2); | |
57 | } | |
58 | ||
59 | void TearDown() override { | |
60 | for (uint64_t i = 0; i < m_session_num; i++) { | |
61 | if (m_cache_client_vec[i] != nullptr) { | |
62 | m_cache_client_vec[i]->close(); | |
63 | delete m_cache_client_vec[i]; | |
64 | } | |
65 | } | |
66 | m_cache_server->stop(); | |
67 | if (m_cache_server_thread->joinable()) { | |
68 | m_cache_server_thread->join(); | |
69 | } | |
70 | delete m_cache_server; | |
71 | delete m_cache_server_thread; | |
72 | ||
73 | std::remove(m_local_path.c_str()); | |
74 | } | |
75 | ||
76 | CacheClient* create_session(uint64_t random_index) { | |
77 | CacheClient* cache_client = new CacheClient(m_local_path, g_ceph_context); | |
78 | cache_client->run(); | |
79 | while (true) { | |
80 | if (0 == cache_client->connect()) { | |
81 | break; | |
82 | } | |
83 | } | |
84 | m_cache_client_vec[random_index] = cache_client; | |
85 | return cache_client; | |
86 | } | |
87 | ||
88 | void server_handle_request(CacheSession* session_id, ObjectCacheRequest* req) { | |
89 | ||
90 | switch (req->get_request_type()) { | |
91 | case RBDSC_REGISTER: { | |
92 | ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY, | |
93 | req->seq); | |
94 | session_id->send(reply); | |
95 | break; | |
96 | } | |
97 | case RBDSC_READ: { | |
98 | ObjectCacheRequest* reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY, | |
99 | req->seq); | |
100 | session_id->send(reply); | |
101 | break; | |
102 | } | |
103 | } | |
104 | } | |
105 | ||
106 | void test_register_client(uint64_t random_index) { | |
107 | ASSERT_TRUE(m_cache_client_vec[random_index] == nullptr); | |
108 | ||
109 | auto ctx = new LambdaContext([](int ret){ | |
110 | ASSERT_TRUE(ret == 0); | |
111 | }); | |
112 | auto session = create_session(random_index); | |
113 | session->register_client(ctx); | |
114 | ||
115 | ASSERT_TRUE(m_cache_client_vec[random_index] != nullptr); | |
116 | ASSERT_TRUE(session->is_session_work()); | |
117 | } | |
118 | ||
119 | void test_lookup_object(std::string pool_nspace, uint64_t index, | |
120 | uint64_t request_num, bool is_last) { | |
121 | ||
122 | for (uint64_t i = 0; i < request_num; i++) { | |
123 | auto ctx = make_gen_lambda_context<ObjectCacheRequest*, | |
124 | std::function<void(ObjectCacheRequest*)>>([this](ObjectCacheRequest* ack) { | |
125 | m_recv_ack_index++; | |
126 | }); | |
127 | m_send_request_index++; | |
128 | // here just for concurrently testing register + lookup, so fix object id. | |
f67539c2 | 129 | m_cache_client_vec[index]->lookup_object(pool_nspace, 1, 2, 3, "1234", std::move(ctx)); |
9f95a23c TL |
130 | } |
131 | ||
132 | if (is_last) { | |
133 | while(m_send_request_index != m_recv_ack_index) { | |
134 | usleep(1); | |
135 | } | |
136 | m_wait_event.signal(); | |
137 | } | |
138 | } | |
139 | }; | |
140 | ||
141 | // test concurrent : multi-session + register_client + lookup_request | |
142 | TEST_F(TestMultiSession, test_multi_session) { | |
143 | ||
144 | uint64_t test_times = 1000; | |
145 | uint64_t test_session_num = 100; | |
146 | ||
147 | for (uint64_t i = 0; i <= test_times; i++) { | |
148 | uint64_t random_index = random() % test_session_num; | |
149 | if (m_cache_client_vec[random_index] == nullptr) { | |
150 | test_register_client(random_index); | |
151 | } else { | |
152 | test_lookup_object(string("test_nspace") + std::to_string(random_index), | |
153 | random_index, 4, i == test_times ? true : false); | |
154 | } | |
155 | } | |
156 | ||
157 | // make sure all ack will be received. | |
158 | m_wait_event.wait(); | |
159 | ||
160 | ASSERT_TRUE(m_send_request_index == m_recv_ack_index); | |
161 | } |