]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync_trace.cc
dc29cfa857ec0b1b71d978fd7a938efcc2ac5aea
[ceph.git] / ceph / src / rgw / rgw_sync_trace.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #ifndef CEPH_RGW_SYNC_TRACE_H
5 #define CEPH_RGW_SYNC_TRACE_H
6
7 #include <regex>
8
9 #include "common/debug.h"
10 #include "common/ceph_json.h"
11
12 #include "rgw_sync_trace.h"
13 #include "rgw_rados.h"
14 #include "rgw_worker.h"
15
16
17 #define dout_context g_ceph_context
18
19 using namespace std;
20
21 RGWSyncTraceNode::RGWSyncTraceNode(CephContext *_cct, uint64_t _handle,
22 const RGWSyncTraceNodeRef& _parent,
23 const string& _type, const string& _id) : cct(_cct),
24 parent(_parent),
25 type(_type),
26 id(_id),
27 handle(_handle),
28 history(cct->_conf->rgw_sync_trace_per_node_log_size)
29 {
30 if (parent.get()) {
31 prefix = parent->get_prefix();
32 }
33
34 if (!type.empty()) {
35 prefix += type;
36 if (!id.empty()) {
37 prefix += "[" + id + "]";
38 }
39 prefix += ":";
40 }
41 }
42
43 void RGWSyncTraceNode::log(int level, const string& s)
44 {
45 status = s;
46 history.push_back(status);
47 /* dump output on either rgw_sync, or rgw -- but only once */
48 if (cct->_conf->subsys.should_gather(ceph_subsys_rgw_sync, level)) {
49 lsubdout(cct, rgw_sync,
50 ceph::dout::need_dynamic(level)) << "RGW-SYNC:" << to_str() << dendl;
51 } else {
52 lsubdout(cct, rgw,
53 ceph::dout::need_dynamic(level)) << "RGW-SYNC:" << to_str() << dendl;
54 }
55 }
56
57
58 class RGWSyncTraceServiceMapThread : public RGWRadosThread {
59 RGWRados *store;
60 RGWSyncTraceManager *manager;
61
62 uint64_t interval_msec() override {
63 return cct->_conf->rgw_sync_trace_servicemap_update_interval * 1000;
64 }
65 public:
66 RGWSyncTraceServiceMapThread(RGWRados *_store, RGWSyncTraceManager *_manager)
67 : RGWRadosThread(_store, "sync-trace"), store(_store), manager(_manager) {}
68
69 int process(const DoutPrefixProvider *dpp) override;
70 };
71
72 int RGWSyncTraceServiceMapThread::process(const DoutPrefixProvider *dpp)
73 {
74 map<string, string> status;
75 status["current_sync"] = manager->get_active_names();
76 int ret = store->update_service_map(dpp, std::move(status));
77 if (ret < 0) {
78 ldout(store->ctx(), 0) << "ERROR: update_service_map() returned ret=" << ret << dendl;
79 }
80 return 0;
81 }
82
83 RGWSyncTraceNodeRef RGWSyncTraceManager::add_node(const RGWSyncTraceNodeRef& parent,
84 const std::string& type,
85 const std::string& id)
86 {
87 shunique_lock wl(lock, ceph::acquire_unique);
88 auto handle = alloc_handle();
89 RGWSyncTraceNodeRef& ref = nodes[handle];
90 ref.reset(new RGWSyncTraceNode(cct, handle, parent, type, id));
91 // return a separate shared_ptr that calls finish() on the node instead of
92 // deleting it. the lambda capture holds a reference to the original 'ref'
93 auto deleter = [ref, this] (RGWSyncTraceNode *node) { finish_node(node); };
94 return {ref.get(), deleter};
95 }
96
97 bool RGWSyncTraceNode::match(const string& search_term, bool search_history)
98 {
99 try {
100 std::regex expr(search_term);
101 std::smatch m;
102
103 if (regex_search(prefix, m, expr)) {
104 return true;
105 }
106 if (regex_search(status, m,expr)) {
107 return true;
108 }
109 if (!search_history) {
110 return false;
111 }
112
113 for (auto h : history) {
114 if (regex_search(h, m, expr)) {
115 return true;
116 }
117 }
118 } catch (const std::regex_error& e) {
119 ldout(cct, 5) << "NOTICE: sync trace: bad expression: bad regex search term" << dendl;
120 }
121
122 return false;
123 }
124
125 void RGWSyncTraceManager::init(RGWRados *store)
126 {
127 service_map_thread = new RGWSyncTraceServiceMapThread(store, this);
128 service_map_thread->start();
129 }
130
131 RGWSyncTraceManager::~RGWSyncTraceManager()
132 {
133 cct->get_admin_socket()->unregister_commands(this);
134 service_map_thread->stop();
135 delete service_map_thread;
136
137 nodes.clear();
138 }
139
140 int RGWSyncTraceManager::hook_to_admin_command()
141 {
142 AdminSocket *admin_socket = cct->get_admin_socket();
143
144 admin_commands = { { "sync trace show name=search,type=CephString,req=false", "sync trace show [filter_str]: show current multisite tracing information" },
145 { "sync trace history name=search,type=CephString,req=false", "sync trace history [filter_str]: show history of multisite tracing information" },
146 { "sync trace active name=search,type=CephString,req=false", "show active multisite sync entities information" },
147 { "sync trace active_short name=search,type=CephString,req=false", "show active multisite sync entities entries" } };
148 for (auto cmd : admin_commands) {
149 int r = admin_socket->register_command(cmd[0], this,
150 cmd[1]);
151 if (r < 0) {
152 lderr(cct) << "ERROR: fail to register admin socket command (r=" << r << ")" << dendl;
153 return r;
154 }
155 }
156 return 0;
157 }
158
159 static void dump_node(RGWSyncTraceNode *entry, bool show_history, Formatter *f)
160 {
161 f->open_object_section("entry");
162 ::encode_json("status", entry->to_str(), f);
163 if (show_history) {
164 f->open_array_section("history");
165 for (auto h : entry->get_history()) {
166 ::encode_json("entry", h, f);
167 }
168 f->close_section();
169 }
170 f->close_section();
171 }
172
173 string RGWSyncTraceManager::get_active_names()
174 {
175 shunique_lock rl(lock, ceph::acquire_shared);
176
177 stringstream ss;
178 JSONFormatter f;
179
180 f.open_array_section("result");
181 for (auto n : nodes) {
182 auto& entry = n.second;
183
184 if (!entry->test_flags(RGW_SNS_FLAG_ACTIVE)) {
185 continue;
186 }
187 const string& name = entry->get_resource_name();
188 if (!name.empty()) {
189 ::encode_json("entry", name, &f);
190 }
191 f.flush(ss);
192 }
193 f.close_section();
194 f.flush(ss);
195
196 return ss.str();
197 }
198
199 int RGWSyncTraceManager::call(std::string_view command, const cmdmap_t& cmdmap,
200 Formatter *f,
201 std::ostream& ss,
202 bufferlist& out) {
203
204 bool show_history = (command == "sync trace history");
205 bool show_short = (command == "sync trace active_short");
206 bool show_active = (command == "sync trace active") || show_short;
207
208 string search;
209
210 auto si = cmdmap.find("search");
211 if (si != cmdmap.end()) {
212 search = boost::get<string>(si->second);
213 }
214
215 shunique_lock rl(lock, ceph::acquire_shared);
216
217 f->open_object_section("result");
218 f->open_array_section("running");
219 for (auto n : nodes) {
220 auto& entry = n.second;
221
222 if (!search.empty() && !entry->match(search, show_history)) {
223 continue;
224 }
225 if (show_active && !entry->test_flags(RGW_SNS_FLAG_ACTIVE)) {
226 continue;
227 }
228 if (show_short) {
229 const string& name = entry->get_resource_name();
230 if (!name.empty()) {
231 ::encode_json("entry", name, f);
232 }
233 } else {
234 dump_node(entry.get(), show_history, f);
235 }
236 f->flush(out);
237 }
238 f->close_section();
239
240 f->open_array_section("complete");
241 for (auto& entry : complete_nodes) {
242 if (!search.empty() && !entry->match(search, show_history)) {
243 continue;
244 }
245 if (show_active && !entry->test_flags(RGW_SNS_FLAG_ACTIVE)) {
246 continue;
247 }
248 dump_node(entry.get(), show_history, f);
249 f->flush(out);
250 }
251 f->close_section();
252
253 f->close_section();
254
255 return 0;
256 }
257
258 void RGWSyncTraceManager::finish_node(RGWSyncTraceNode *node)
259 {
260 RGWSyncTraceNodeRef old_node;
261
262 {
263 shunique_lock wl(lock, ceph::acquire_unique);
264 if (!node) {
265 return;
266 }
267 auto iter = nodes.find(node->handle);
268 if (iter == nodes.end()) {
269 /* not found, already finished */
270 return;
271 }
272
273 if (complete_nodes.full()) {
274 /* take a reference to the entry that is going to be evicted,
275 * can't let it get evicted under lock held, otherwise
276 * it's a deadlock as it will call finish_node()
277 */
278 old_node = complete_nodes.front();
279 }
280
281 complete_nodes.push_back(iter->second);
282 nodes.erase(iter);
283 }
284 };
285
286 #endif
287