]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/immutable_object_cache/test_multi_session.cc
buildsys: switch source download to quincy
[ceph.git] / ceph / src / test / immutable_object_cache / test_multi_session.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3#include <iostream>
4#include <unistd.h>
5
6#include "gtest/gtest.h"
7#include "include/Context.h"
8#include "global/global_init.h"
9#include "global/global_context.h"
10
11#include "test/immutable_object_cache/test_common.h"
12#include "tools/immutable_object_cache/CacheClient.h"
13#include "tools/immutable_object_cache/CacheServer.h"
14
15using namespace ceph::immutable_obj_cache;
16
17class TestMultiSession : public ::testing::Test {
18public:
19 std::string m_local_path;
20 CacheServer* m_cache_server;
21 std::thread* m_cache_server_thread;
22 std::vector<CacheClient*> m_cache_client_vec;
23 WaitEvent m_wait_event;
24 std::atomic<uint64_t> m_send_request_index;
25 std::atomic<uint64_t> m_recv_ack_index;
26 uint64_t m_session_num = 110;
27
28 TestMultiSession() : m_local_path("/tmp/ceph_test_multisession_socket"),
29 m_cache_server_thread(nullptr), m_send_request_index(0),
30 m_recv_ack_index(0) {
31 m_cache_client_vec.resize(m_session_num + 1, nullptr);
32 }
33
34 ~TestMultiSession() {}
35
36 static void SetUpTestCase() {}
37 static void TearDownTestCase() {}
38
39 void SetUp() override {
40 std::remove(m_local_path.c_str());
41 m_cache_server = new CacheServer(g_ceph_context, m_local_path,
42 [this](CacheSession* session_id, ObjectCacheRequest* req){
43 server_handle_request(session_id, req);
44 });
45 ASSERT_TRUE(m_cache_server != nullptr);
46
47 m_cache_server_thread = new std::thread(([this]() {
48 m_wait_event.signal();
49 m_cache_server->run();
50 }));
51
52 // waiting for thread running.
53 m_wait_event.wait();
54
55 // waiting for io_service run.
56 usleep(2);
57 }
58
59 void TearDown() override {
60 for (uint64_t i = 0; i < m_session_num; i++) {
61 if (m_cache_client_vec[i] != nullptr) {
62 m_cache_client_vec[i]->close();
63 delete m_cache_client_vec[i];
64 }
65 }
66 m_cache_server->stop();
67 if (m_cache_server_thread->joinable()) {
68 m_cache_server_thread->join();
69 }
70 delete m_cache_server;
71 delete m_cache_server_thread;
72
73 std::remove(m_local_path.c_str());
74 }
75
76 CacheClient* create_session(uint64_t random_index) {
77 CacheClient* cache_client = new CacheClient(m_local_path, g_ceph_context);
78 cache_client->run();
79 while (true) {
80 if (0 == cache_client->connect()) {
81 break;
82 }
83 }
84 m_cache_client_vec[random_index] = cache_client;
85 return cache_client;
86 }
87
88 void server_handle_request(CacheSession* session_id, ObjectCacheRequest* req) {
89
90 switch (req->get_request_type()) {
91 case RBDSC_REGISTER: {
92 ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY,
93 req->seq);
94 session_id->send(reply);
95 break;
96 }
97 case RBDSC_READ: {
98 ObjectCacheRequest* reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY,
99 req->seq);
100 session_id->send(reply);
101 break;
102 }
103 }
104 }
105
106 void test_register_client(uint64_t random_index) {
107 ASSERT_TRUE(m_cache_client_vec[random_index] == nullptr);
108
109 auto ctx = new LambdaContext([](int ret){
110 ASSERT_TRUE(ret == 0);
111 });
112 auto session = create_session(random_index);
113 session->register_client(ctx);
114
115 ASSERT_TRUE(m_cache_client_vec[random_index] != nullptr);
116 ASSERT_TRUE(session->is_session_work());
117 }
118
119 void test_lookup_object(std::string pool_nspace, uint64_t index,
120 uint64_t request_num, bool is_last) {
121
122 for (uint64_t i = 0; i < request_num; i++) {
123 auto ctx = make_gen_lambda_context<ObjectCacheRequest*,
124 std::function<void(ObjectCacheRequest*)>>([this](ObjectCacheRequest* ack) {
125 m_recv_ack_index++;
126 });
127 m_send_request_index++;
128 // here just for concurrently testing register + lookup, so fix object id.
f67539c2 129 m_cache_client_vec[index]->lookup_object(pool_nspace, 1, 2, 3, "1234", std::move(ctx));
9f95a23c
TL
130 }
131
132 if (is_last) {
133 while(m_send_request_index != m_recv_ack_index) {
134 usleep(1);
135 }
136 m_wait_event.signal();
137 }
138 }
139};
140
141// test concurrent : multi-session + register_client + lookup_request
142TEST_F(TestMultiSession, test_multi_session) {
143
144 uint64_t test_times = 1000;
145 uint64_t test_session_num = 100;
146
147 for (uint64_t i = 0; i <= test_times; i++) {
148 uint64_t random_index = random() % test_session_num;
149 if (m_cache_client_vec[random_index] == nullptr) {
150 test_register_client(random_index);
151 } else {
152 test_lookup_object(string("test_nspace") + std::to_string(random_index),
153 random_index, 4, i == test_times ? true : false);
154 }
155 }
156
157 // make sure all ack will be received.
158 m_wait_event.wait();
159
160 ASSERT_TRUE(m_send_request_index == m_recv_ack_index);
161}