]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/immutable_object_cache/test_DomainSocket.cc
Import ceph 15.2.8
[ceph.git] / ceph / src / test / immutable_object_cache / test_DomainSocket.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <iostream>
5 #include <unistd.h>
6
7 #include "gtest/gtest.h"
8 #include "include/Context.h"
9 #include "global/global_init.h"
10 #include "global/global_context.h"
11
12 #include "test/immutable_object_cache/test_common.h"
13 #include "tools/immutable_object_cache/CacheClient.h"
14 #include "tools/immutable_object_cache/CacheServer.h"
15
16 using namespace ceph::immutable_obj_cache;
17
18 class TestCommunication :public ::testing::Test {
19 public:
20 CacheServer* m_cache_server;
21 std::thread* srv_thd;
22 CacheClient* m_cache_client;
23 std::string m_local_path;
24 pthread_mutex_t m_mutex;
25 pthread_cond_t m_cond;
26 std::atomic<uint64_t> m_send_request_index;
27 std::atomic<uint64_t> m_recv_ack_index;
28 WaitEvent m_wait_event;
29 unordered_set<std::string> m_hit_entry_set;
30
31 TestCommunication()
32 : m_cache_server(nullptr), m_cache_client(nullptr),
33 m_local_path("/tmp/ceph_test_domain_socket"),
34 m_send_request_index(0), m_recv_ack_index(0)
35 {}
36
37 ~TestCommunication() {}
38
39 static void SetUpTestCase() {}
40 static void TearDownTestCase() {}
41
42 void SetUp() override {
43 std::remove(m_local_path.c_str());
44 m_cache_server = new CacheServer(g_ceph_context, m_local_path,
45 [this](CacheSession* sid, ObjectCacheRequest* req){
46 handle_request(sid, req);
47 });
48 ASSERT_TRUE(m_cache_server != nullptr);
49 srv_thd = new std::thread([this]() {m_cache_server->run();});
50
51 m_cache_client = new CacheClient(m_local_path, g_ceph_context);
52 ASSERT_TRUE(m_cache_client != nullptr);
53 m_cache_client->run();
54
55 while (true) {
56 if (0 == m_cache_client->connect()) {
57 break;
58 }
59 }
60
61 auto ctx = new LambdaContext([](int reg) {
62 ASSERT_TRUE(reg == 0);
63 });
64 m_cache_client->register_client(ctx);
65 ASSERT_TRUE(m_cache_client->is_session_work());
66 }
67
68 void TearDown() override {
69
70 delete m_cache_client;
71 m_cache_server->stop();
72 if (srv_thd->joinable()) {
73 srv_thd->join();
74 }
75 delete m_cache_server;
76 std::remove(m_local_path.c_str());
77 delete srv_thd;
78 }
79
80 void handle_request(CacheSession* session_id, ObjectCacheRequest* req) {
81
82 switch (req->get_request_type()) {
83 case RBDSC_REGISTER: {
84 ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY, req->seq);
85 session_id->send(reply);
86 break;
87 }
88 case RBDSC_READ: {
89 ObjectCacheReadData* read_req = (ObjectCacheReadData*)req;
90 ObjectCacheRequest* reply = nullptr;
91 if (m_hit_entry_set.find(read_req->oid) == m_hit_entry_set.end()) {
92 reply = new ObjectCacheReadRadosData(RBDSC_READ_RADOS, req->seq);
93 } else {
94 reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY, req->seq, "/fakepath");
95 }
96 session_id->send(reply);
97 break;
98 }
99 }
100 }
101
102 // times: message number
103 // queue_depth : imitate message queue depth
104 // thinking : imitate handing message time
105 void startup_pingpong_testing(uint64_t times, uint64_t queue_depth, int thinking) {
106 m_send_request_index.store(0);
107 m_recv_ack_index.store(0);
108 for (uint64_t index = 0; index < times; index++) {
109 auto ctx = make_gen_lambda_context<ObjectCacheRequest*, std::function<void(ObjectCacheRequest*)>>
110 ([this, thinking, times](ObjectCacheRequest* ack){
111 if (thinking != 0) {
112 usleep(thinking); // handling message
113 }
114 m_recv_ack_index++;
115 if (m_recv_ack_index == times) {
116 m_wait_event.signal();
117 }
118 });
119
120 // simple queue depth
121 while (m_send_request_index - m_recv_ack_index > queue_depth) {
122 usleep(1);
123 }
124
125 m_cache_client->lookup_object("pool_nspace", 1, 2, "object_name", std::move(ctx));
126 m_send_request_index++;
127 }
128 m_wait_event.wait();
129 }
130
131 bool startup_lookupobject_testing(std::string pool_nspace, std::string object_id) {
132 bool hit;
133 auto ctx = make_gen_lambda_context<ObjectCacheRequest*, std::function<void(ObjectCacheRequest*)>>
134 ([this, &hit](ObjectCacheRequest* ack){
135 hit = ack->type == RBDSC_READ_REPLY;
136 m_wait_event.signal();
137 });
138 m_cache_client->lookup_object(pool_nspace, 1, 2, object_id, std::move(ctx));
139 m_wait_event.wait();
140 return hit;
141 }
142
143 void set_hit_entry_in_fake_lru(std::string cache_file_name) {
144 if (m_hit_entry_set.find(cache_file_name) == m_hit_entry_set.end()) {
145 m_hit_entry_set.insert(cache_file_name);
146 }
147 }
148 };
149
150 TEST_F(TestCommunication, test_pingpong) {
151
152 startup_pingpong_testing(64, 16, 0);
153 ASSERT_TRUE(m_send_request_index == m_recv_ack_index);
154 startup_pingpong_testing(200, 128, 0);
155 ASSERT_TRUE(m_send_request_index == m_recv_ack_index);
156 }
157
158 TEST_F(TestCommunication, test_lookup_object) {
159
160 m_hit_entry_set.clear();
161
162 srand(time(0));
163 uint64_t random_hit = random();
164
165 for (uint64_t i = 50; i < 100; i++) {
166 if ((random_hit % i) == 0) {
167 set_hit_entry_in_fake_lru(std::to_string(i));
168 }
169 }
170 for (uint64_t i = 50; i < 100; i++) {
171 if ((random_hit % i) != 0) {
172 ASSERT_FALSE(startup_lookupobject_testing("test_nspace", std::to_string(i)));
173 } else {
174 ASSERT_TRUE(startup_lookupobject_testing("test_nspace", std::to_string(i)));
175 }
176 }
177 }