]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/driver/rados/rgw_sync_trace.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / rgw / driver / rados / rgw_sync_trace.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #ifndef CEPH_RGW_SYNC_TRACE_H
5 #define CEPH_RGW_SYNC_TRACE_H
6
7 #include <regex>
8
9 #include "common/debug.h"
10 #include "common/ceph_json.h"
11
12 #include "rgw_sync_trace.h"
13 #include "rgw_rados.h"
14 #include "rgw_worker.h"
15
16 #define dout_context g_ceph_context
17
18 static constexpr auto dout_subsys = ceph_subsys_rgw;
19
20 using namespace std;
21
22
23 RGWSyncTraceNode::RGWSyncTraceNode(CephContext *_cct, uint64_t _handle,
24 const RGWSyncTraceNodeRef& _parent,
25 const string& _type, const string& _id) : cct(_cct),
26 parent(_parent),
27 type(_type),
28 id(_id),
29 handle(_handle),
30 history(cct->_conf->rgw_sync_trace_per_node_log_size)
31 {
32 if (parent.get()) {
33 prefix = parent->get_prefix();
34 }
35
36 if (!type.empty()) {
37 prefix += type;
38 if (!id.empty()) {
39 prefix += "[" + id + "]";
40 }
41 prefix += ":";
42 }
43 }
44
45 void RGWSyncTraceNode::log(int level, const string& s)
46 {
47 status = s;
48 history.push_back(status);
49 /* dump output on either rgw_sync, or rgw -- but only once */
50 if (cct->_conf->subsys.should_gather(ceph_subsys_rgw_sync, level)) {
51 lsubdout(cct, rgw_sync,
52 ceph::dout::need_dynamic(level)) << "RGW-SYNC:" << to_str() << dendl;
53 } else {
54 lsubdout(cct, rgw,
55 ceph::dout::need_dynamic(level)) << "RGW-SYNC:" << to_str() << dendl;
56 }
57 }
58
59
60 class RGWSyncTraceServiceMapThread : public RGWRadosThread {
61 RGWRados *store;
62 RGWSyncTraceManager *manager;
63
64 uint64_t interval_msec() override {
65 return cct->_conf->rgw_sync_trace_servicemap_update_interval * 1000;
66 }
67 public:
68 RGWSyncTraceServiceMapThread(RGWRados *_store, RGWSyncTraceManager *_manager)
69 : RGWRadosThread(_store, "sync-trace"), store(_store), manager(_manager) {}
70
71 int process(const DoutPrefixProvider *dpp) override;
72 };
73
74 int RGWSyncTraceServiceMapThread::process(const DoutPrefixProvider *dpp)
75 {
76 map<string, string> status;
77 status["current_sync"] = manager->get_active_names();
78 int ret = store->update_service_map(dpp, std::move(status));
79 if (ret < 0) {
80 ldout(store->ctx(), 0) << "ERROR: update_service_map() returned ret=" << ret << dendl;
81 }
82 return 0;
83 }
84
85 RGWSyncTraceNodeRef RGWSyncTraceManager::add_node(const RGWSyncTraceNodeRef& parent,
86 const std::string& type,
87 const std::string& id)
88 {
89 shunique_lock wl(lock, ceph::acquire_unique);
90 auto handle = alloc_handle();
91 RGWSyncTraceNodeRef& ref = nodes[handle];
92 ref.reset(new RGWSyncTraceNode(cct, handle, parent, type, id));
93 // return a separate shared_ptr that calls finish() on the node instead of
94 // deleting it. the lambda capture holds a reference to the original 'ref'
95 auto deleter = [ref, this] (RGWSyncTraceNode *node) { finish_node(node); };
96 return {ref.get(), deleter};
97 }
98
99 bool RGWSyncTraceNode::match(const string& search_term, bool search_history)
100 {
101 try {
102 std::regex expr(search_term);
103 std::smatch m;
104
105 if (regex_search(prefix, m, expr)) {
106 return true;
107 }
108 if (regex_search(status, m,expr)) {
109 return true;
110 }
111 if (!search_history) {
112 return false;
113 }
114
115 for (auto h : history) {
116 if (regex_search(h, m, expr)) {
117 return true;
118 }
119 }
120 } catch (const std::regex_error& e) {
121 ldout(cct, 5) << "NOTICE: sync trace: bad expression: bad regex search term" << dendl;
122 }
123
124 return false;
125 }
126
127 void RGWSyncTraceManager::init(RGWRados *store)
128 {
129 service_map_thread = new RGWSyncTraceServiceMapThread(store, this);
130 service_map_thread->start();
131 }
132
133 RGWSyncTraceManager::~RGWSyncTraceManager()
134 {
135 cct->get_admin_socket()->unregister_commands(this);
136 service_map_thread->stop();
137 delete service_map_thread;
138
139 nodes.clear();
140 }
141
142 int RGWSyncTraceManager::hook_to_admin_command()
143 {
144 AdminSocket *admin_socket = cct->get_admin_socket();
145
146 admin_commands = { { "sync trace show name=search,type=CephString,req=false", "sync trace show [filter_str]: show current multisite tracing information" },
147 { "sync trace history name=search,type=CephString,req=false", "sync trace history [filter_str]: show history of multisite tracing information" },
148 { "sync trace active name=search,type=CephString,req=false", "show active multisite sync entities information" },
149 { "sync trace active_short name=search,type=CephString,req=false", "show active multisite sync entities entries" } };
150 for (auto cmd : admin_commands) {
151 int r = admin_socket->register_command(cmd[0], this,
152 cmd[1]);
153 if (r < 0) {
154 lderr(cct) << "ERROR: fail to register admin socket command (r=" << r << ")" << dendl;
155 return r;
156 }
157 }
158 return 0;
159 }
160
161 static void dump_node(RGWSyncTraceNode *entry, bool show_history, Formatter *f)
162 {
163 f->open_object_section("entry");
164 ::encode_json("status", entry->to_str(), f);
165 if (show_history) {
166 f->open_array_section("history");
167 for (auto h : entry->get_history()) {
168 ::encode_json("entry", h, f);
169 }
170 f->close_section();
171 }
172 f->close_section();
173 }
174
175 string RGWSyncTraceManager::get_active_names()
176 {
177 shunique_lock rl(lock, ceph::acquire_shared);
178
179 stringstream ss;
180 JSONFormatter f;
181
182 f.open_array_section("result");
183 for (auto n : nodes) {
184 auto& entry = n.second;
185
186 if (!entry->test_flags(RGW_SNS_FLAG_ACTIVE)) {
187 continue;
188 }
189 const string& name = entry->get_resource_name();
190 if (!name.empty()) {
191 ::encode_json("entry", name, &f);
192 }
193 f.flush(ss);
194 }
195 f.close_section();
196 f.flush(ss);
197
198 return ss.str();
199 }
200
201 int RGWSyncTraceManager::call(std::string_view command, const cmdmap_t& cmdmap,
202 const bufferlist&,
203 Formatter *f,
204 std::ostream& ss,
205 bufferlist& out) {
206
207 bool show_history = (command == "sync trace history");
208 bool show_short = (command == "sync trace active_short");
209 bool show_active = (command == "sync trace active") || show_short;
210
211 string search;
212
213 auto si = cmdmap.find("search");
214 if (si != cmdmap.end()) {
215 search = boost::get<string>(si->second);
216 }
217
218 shunique_lock rl(lock, ceph::acquire_shared);
219
220 f->open_object_section("result");
221 f->open_array_section("running");
222 for (auto n : nodes) {
223 auto& entry = n.second;
224
225 if (!search.empty() && !entry->match(search, show_history)) {
226 continue;
227 }
228 if (show_active && !entry->test_flags(RGW_SNS_FLAG_ACTIVE)) {
229 continue;
230 }
231 if (show_short) {
232 const string& name = entry->get_resource_name();
233 if (!name.empty()) {
234 ::encode_json("entry", name, f);
235 }
236 } else {
237 dump_node(entry.get(), show_history, f);
238 }
239 f->flush(out);
240 }
241 f->close_section();
242
243 f->open_array_section("complete");
244 for (auto& entry : complete_nodes) {
245 if (!search.empty() && !entry->match(search, show_history)) {
246 continue;
247 }
248 if (show_active && !entry->test_flags(RGW_SNS_FLAG_ACTIVE)) {
249 continue;
250 }
251 dump_node(entry.get(), show_history, f);
252 f->flush(out);
253 }
254 f->close_section();
255
256 f->close_section();
257
258 return 0;
259 }
260
261 void RGWSyncTraceManager::finish_node(RGWSyncTraceNode *node)
262 {
263 RGWSyncTraceNodeRef old_node;
264
265 {
266 shunique_lock wl(lock, ceph::acquire_unique);
267 if (!node) {
268 return;
269 }
270 auto iter = nodes.find(node->handle);
271 if (iter == nodes.end()) {
272 /* not found, already finished */
273 return;
274 }
275
276 if (complete_nodes.full()) {
277 /* take a reference to the entry that is going to be evicted,
278 * can't let it get evicted under lock held, otherwise
279 * it's a deadlock as it will call finish_node()
280 */
281 old_node = complete_nodes.front();
282 }
283
284 complete_nodes.push_back(iter->second);
285 nodes.erase(iter);
286 }
287 };
288
289 #endif
290