]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/immutable_object_cache/test_multi_session.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / test / immutable_object_cache / test_multi_session.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3#include <iostream>
4#include <unistd.h>
5
6#include "gtest/gtest.h"
7#include "include/Context.h"
8#include "global/global_init.h"
9#include "global/global_context.h"
10
11#include "test/immutable_object_cache/test_common.h"
12#include "tools/immutable_object_cache/CacheClient.h"
13#include "tools/immutable_object_cache/CacheServer.h"
14
20effc67 15using namespace std;
9f95a23c
TL
16using namespace ceph::immutable_obj_cache;
17
18class TestMultiSession : public ::testing::Test {
19public:
20 std::string m_local_path;
21 CacheServer* m_cache_server;
22 std::thread* m_cache_server_thread;
23 std::vector<CacheClient*> m_cache_client_vec;
24 WaitEvent m_wait_event;
25 std::atomic<uint64_t> m_send_request_index;
26 std::atomic<uint64_t> m_recv_ack_index;
27 uint64_t m_session_num = 110;
28
29 TestMultiSession() : m_local_path("/tmp/ceph_test_multisession_socket"),
30 m_cache_server_thread(nullptr), m_send_request_index(0),
31 m_recv_ack_index(0) {
32 m_cache_client_vec.resize(m_session_num + 1, nullptr);
33 }
34
35 ~TestMultiSession() {}
36
37 static void SetUpTestCase() {}
38 static void TearDownTestCase() {}
39
40 void SetUp() override {
41 std::remove(m_local_path.c_str());
42 m_cache_server = new CacheServer(g_ceph_context, m_local_path,
43 [this](CacheSession* session_id, ObjectCacheRequest* req){
44 server_handle_request(session_id, req);
45 });
46 ASSERT_TRUE(m_cache_server != nullptr);
47
48 m_cache_server_thread = new std::thread(([this]() {
49 m_wait_event.signal();
50 m_cache_server->run();
51 }));
52
53 // waiting for thread running.
54 m_wait_event.wait();
55
56 // waiting for io_service run.
57 usleep(2);
58 }
59
60 void TearDown() override {
61 for (uint64_t i = 0; i < m_session_num; i++) {
62 if (m_cache_client_vec[i] != nullptr) {
63 m_cache_client_vec[i]->close();
64 delete m_cache_client_vec[i];
65 }
66 }
67 m_cache_server->stop();
68 if (m_cache_server_thread->joinable()) {
69 m_cache_server_thread->join();
70 }
71 delete m_cache_server;
72 delete m_cache_server_thread;
73
74 std::remove(m_local_path.c_str());
75 }
76
77 CacheClient* create_session(uint64_t random_index) {
78 CacheClient* cache_client = new CacheClient(m_local_path, g_ceph_context);
79 cache_client->run();
80 while (true) {
81 if (0 == cache_client->connect()) {
82 break;
83 }
84 }
85 m_cache_client_vec[random_index] = cache_client;
86 return cache_client;
87 }
88
89 void server_handle_request(CacheSession* session_id, ObjectCacheRequest* req) {
90
91 switch (req->get_request_type()) {
92 case RBDSC_REGISTER: {
93 ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY,
94 req->seq);
95 session_id->send(reply);
96 break;
97 }
98 case RBDSC_READ: {
99 ObjectCacheRequest* reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY,
100 req->seq);
101 session_id->send(reply);
102 break;
103 }
104 }
105 }
106
107 void test_register_client(uint64_t random_index) {
108 ASSERT_TRUE(m_cache_client_vec[random_index] == nullptr);
109
110 auto ctx = new LambdaContext([](int ret){
111 ASSERT_TRUE(ret == 0);
112 });
113 auto session = create_session(random_index);
114 session->register_client(ctx);
115
116 ASSERT_TRUE(m_cache_client_vec[random_index] != nullptr);
117 ASSERT_TRUE(session->is_session_work());
118 }
119
120 void test_lookup_object(std::string pool_nspace, uint64_t index,
121 uint64_t request_num, bool is_last) {
122
123 for (uint64_t i = 0; i < request_num; i++) {
124 auto ctx = make_gen_lambda_context<ObjectCacheRequest*,
125 std::function<void(ObjectCacheRequest*)>>([this](ObjectCacheRequest* ack) {
126 m_recv_ack_index++;
127 });
128 m_send_request_index++;
129 // here just for concurrently testing register + lookup, so fix object id.
f67539c2 130 m_cache_client_vec[index]->lookup_object(pool_nspace, 1, 2, 3, "1234", std::move(ctx));
9f95a23c
TL
131 }
132
133 if (is_last) {
134 while(m_send_request_index != m_recv_ack_index) {
135 usleep(1);
136 }
137 m_wait_event.signal();
138 }
139 }
140};
141
142// test concurrent : multi-session + register_client + lookup_request
143TEST_F(TestMultiSession, test_multi_session) {
144
145 uint64_t test_times = 1000;
146 uint64_t test_session_num = 100;
147
148 for (uint64_t i = 0; i <= test_times; i++) {
149 uint64_t random_index = random() % test_session_num;
150 if (m_cache_client_vec[random_index] == nullptr) {
151 test_register_client(random_index);
152 } else {
153 test_lookup_object(string("test_nspace") + std::to_string(random_index),
154 random_index, 4, i == test_times ? true : false);
155 }
156 }
157
158 // make sure all ack will be received.
159 m_wait_event.wait();
160
161 ASSERT_TRUE(m_send_request_index == m_recv_ack_index);
162}