]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync_trace.cc
ddcdea2495204cf2ec1238321d6313688a04b1fc
[ceph.git] / ceph / src / rgw / rgw_sync_trace.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #ifndef CEPH_RGW_SYNC_TRACE_H
5 #define CEPH_RGW_SYNC_TRACE_H
6
7 #include <regex>
8
9 #include "common/debug.h"
10 #include "common/ceph_json.h"
11
12 #include "rgw_sync_trace.h"
13 #include "rgw_rados.h"
14 #include "rgw_worker.h"
15
16
17 #define dout_context g_ceph_context
18 #define dout_subsys ceph_subsys_rgw_sync
19
20 RGWSyncTraceNode::RGWSyncTraceNode(CephContext *_cct, uint64_t _handle,
21 const RGWSyncTraceNodeRef& _parent,
22 const string& _type, const string& _id) : cct(_cct),
23 parent(_parent),
24 type(_type),
25 id(_id),
26 handle(_handle),
27 history(cct->_conf->rgw_sync_trace_per_node_log_size)
28 {
29 if (parent.get()) {
30 prefix = parent->get_prefix();
31 }
32
33 if (!type.empty()) {
34 prefix += type;
35 if (!id.empty()) {
36 prefix += "[" + id + "]";
37 }
38 prefix += ":";
39 }
40 }
41
42 void RGWSyncTraceNode::log(int level, const string& s)
43 {
44 status = s;
45 history.push_back(status);
46 /* dump output on either rgw_sync, or rgw -- but only once */
47 if (cct->_conf->subsys.should_gather(ceph_subsys_rgw_sync, level)) {
48 lsubdout(cct, rgw_sync,
49 ceph::dout::need_dynamic(level)) << "RGW-SYNC:" << to_str() << dendl;
50 } else {
51 lsubdout(cct, rgw,
52 ceph::dout::need_dynamic(level)) << "RGW-SYNC:" << to_str() << dendl;
53 }
54 }
55
56
57 class RGWSyncTraceServiceMapThread : public RGWRadosThread {
58 RGWRados *store;
59 RGWSyncTraceManager *manager;
60
61 uint64_t interval_msec() override {
62 return cct->_conf->rgw_sync_trace_servicemap_update_interval * 1000;
63 }
64 public:
65 RGWSyncTraceServiceMapThread(RGWRados *_store, RGWSyncTraceManager *_manager)
66 : RGWRadosThread(_store, "sync-trace"), store(_store), manager(_manager) {}
67
68 int process() override;
69 };
70
71 int RGWSyncTraceServiceMapThread::process()
72 {
73 map<string, string> status;
74 status["current_sync"] = manager->get_active_names();
75 int ret = store->update_service_map(std::move(status));
76 if (ret < 0) {
77 ldout(store->ctx(), 0) << "ERROR: update_service_map() returned ret=" << ret << dendl;
78 }
79 return 0;
80 }
81
82 RGWSyncTraceNodeRef RGWSyncTraceManager::add_node(const RGWSyncTraceNodeRef& parent,
83 const std::string& type,
84 const std::string& id)
85 {
86 shunique_lock wl(lock, ceph::acquire_unique);
87 auto handle = alloc_handle();
88 RGWSyncTraceNodeRef& ref = nodes[handle];
89 ref.reset(new RGWSyncTraceNode(cct, handle, parent, type, id));
90 // return a separate shared_ptr that calls finish() on the node instead of
91 // deleting it. the lambda capture holds a reference to the original 'ref'
92 auto deleter = [ref, this] (RGWSyncTraceNode *node) { finish_node(node); };
93 return {ref.get(), deleter};
94 }
95
96 bool RGWSyncTraceNode::match(const string& search_term, bool search_history)
97 {
98 try {
99 std::regex expr(search_term);
100 std::smatch m;
101
102 if (regex_search(prefix, m, expr)) {
103 return true;
104 }
105 if (regex_search(status, m,expr)) {
106 return true;
107 }
108 if (!search_history) {
109 return false;
110 }
111
112 for (auto h : history) {
113 if (regex_search(h, m, expr)) {
114 return true;
115 }
116 }
117 } catch (const std::regex_error& e) {
118 ldout(cct, 5) << "NOTICE: sync trace: bad expression: bad regex search term" << dendl;
119 }
120
121 return false;
122 }
123
124 void RGWSyncTraceManager::init(RGWRados *store)
125 {
126 service_map_thread = new RGWSyncTraceServiceMapThread(store, this);
127 service_map_thread->start();
128 }
129
130 RGWSyncTraceManager::~RGWSyncTraceManager()
131 {
132 cct->get_admin_socket()->unregister_commands(this);
133 service_map_thread->stop();
134 delete service_map_thread;
135
136 nodes.clear();
137 }
138
139 int RGWSyncTraceManager::hook_to_admin_command()
140 {
141 AdminSocket *admin_socket = cct->get_admin_socket();
142
143 admin_commands = { { "sync trace show name=search,type=CephString,req=false", "sync trace show [filter_str]: show current multisite tracing information" },
144 { "sync trace history name=search,type=CephString,req=false", "sync trace history [filter_str]: show history of multisite tracing information" },
145 { "sync trace active name=search,type=CephString,req=false", "show active multisite sync entities information" },
146 { "sync trace active_short name=search,type=CephString,req=false", "show active multisite sync entities entries" } };
147 for (auto cmd : admin_commands) {
148 int r = admin_socket->register_command(cmd[0], this,
149 cmd[1]);
150 if (r < 0) {
151 lderr(cct) << "ERROR: fail to register admin socket command (r=" << r << ")" << dendl;
152 return r;
153 }
154 }
155 return 0;
156 }
157
158 static void dump_node(RGWSyncTraceNode *entry, bool show_history, Formatter *f)
159 {
160 f->open_object_section("entry");
161 ::encode_json("status", entry->to_str(), f);
162 if (show_history) {
163 f->open_array_section("history");
164 for (auto h : entry->get_history()) {
165 ::encode_json("entry", h, f);
166 }
167 f->close_section();
168 }
169 f->close_section();
170 }
171
172 string RGWSyncTraceManager::get_active_names()
173 {
174 shunique_lock rl(lock, ceph::acquire_shared);
175
176 stringstream ss;
177 JSONFormatter f;
178
179 f.open_array_section("result");
180 for (auto n : nodes) {
181 auto& entry = n.second;
182
183 if (!entry->test_flags(RGW_SNS_FLAG_ACTIVE)) {
184 continue;
185 }
186 const string& name = entry->get_resource_name();
187 if (!name.empty()) {
188 ::encode_json("entry", name, &f);
189 }
190 f.flush(ss);
191 }
192 f.close_section();
193 f.flush(ss);
194
195 return ss.str();
196 }
197
198 int RGWSyncTraceManager::call(std::string_view command, const cmdmap_t& cmdmap,
199 Formatter *f,
200 std::ostream& ss,
201 bufferlist& out) {
202
203 bool show_history = (command == "sync trace history");
204 bool show_short = (command == "sync trace active_short");
205 bool show_active = (command == "sync trace active") || show_short;
206
207 string search;
208
209 auto si = cmdmap.find("search");
210 if (si != cmdmap.end()) {
211 search = boost::get<string>(si->second);
212 }
213
214 shunique_lock rl(lock, ceph::acquire_shared);
215
216 f->open_object_section("result");
217 f->open_array_section("running");
218 for (auto n : nodes) {
219 auto& entry = n.second;
220
221 if (!search.empty() && !entry->match(search, show_history)) {
222 continue;
223 }
224 if (show_active && !entry->test_flags(RGW_SNS_FLAG_ACTIVE)) {
225 continue;
226 }
227 if (show_short) {
228 const string& name = entry->get_resource_name();
229 if (!name.empty()) {
230 ::encode_json("entry", name, f);
231 }
232 } else {
233 dump_node(entry.get(), show_history, f);
234 }
235 f->flush(out);
236 }
237 f->close_section();
238
239 f->open_array_section("complete");
240 for (auto& entry : complete_nodes) {
241 if (!search.empty() && !entry->match(search, show_history)) {
242 continue;
243 }
244 if (show_active && !entry->test_flags(RGW_SNS_FLAG_ACTIVE)) {
245 continue;
246 }
247 dump_node(entry.get(), show_history, f);
248 f->flush(out);
249 }
250 f->close_section();
251
252 f->close_section();
253
254 return 0;
255 }
256
257 void RGWSyncTraceManager::finish_node(RGWSyncTraceNode *node)
258 {
259 RGWSyncTraceNodeRef old_node;
260
261 {
262 shunique_lock wl(lock, ceph::acquire_unique);
263 if (!node) {
264 return;
265 }
266 auto iter = nodes.find(node->handle);
267 if (iter == nodes.end()) {
268 /* not found, already finished */
269 return;
270 }
271
272 if (complete_nodes.full()) {
273 /* take a reference to the entry that is going to be evicted,
274 * can't let it get evicted under lock held, otherwise
275 * it's a deadlock as it will call finish_node()
276 */
277 old_node = complete_nodes.front();
278 }
279
280 complete_nodes.push_back(iter->second);
281 nodes.erase(iter);
282 }
283 };
284
285 #endif
286