1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #ifndef CEPH_RGW_SYNC_TRACE_H
5 #define CEPH_RGW_SYNC_TRACE_H
9 #include "common/debug.h"
10 #include "common/ceph_json.h"
12 #include "rgw_sync_trace.h"
13 #include "rgw_rados.h"
14 #include "rgw_worker.h"
16 #define dout_context g_ceph_context
18 static constexpr auto dout_subsys
= ceph_subsys_rgw
;
23 RGWSyncTraceNode::RGWSyncTraceNode(CephContext
*_cct
, uint64_t _handle
,
24 const RGWSyncTraceNodeRef
& _parent
,
25 const string
& _type
, const string
& _id
) : cct(_cct
),
30 history(cct
->_conf
->rgw_sync_trace_per_node_log_size
)
33 prefix
= parent
->get_prefix();
39 prefix
+= "[" + id
+ "]";
45 void RGWSyncTraceNode::log(int level
, const string
& s
)
48 history
.push_back(status
);
49 /* dump output on either rgw_sync, or rgw -- but only once */
50 if (cct
->_conf
->subsys
.should_gather(ceph_subsys_rgw_sync
, level
)) {
51 lsubdout(cct
, rgw_sync
,
52 ceph::dout::need_dynamic(level
)) << "RGW-SYNC:" << to_str() << dendl
;
55 ceph::dout::need_dynamic(level
)) << "RGW-SYNC:" << to_str() << dendl
;
60 class RGWSyncTraceServiceMapThread
: public RGWRadosThread
{
62 RGWSyncTraceManager
*manager
;
64 uint64_t interval_msec() override
{
65 return cct
->_conf
->rgw_sync_trace_servicemap_update_interval
* 1000;
68 RGWSyncTraceServiceMapThread(RGWRados
*_store
, RGWSyncTraceManager
*_manager
)
69 : RGWRadosThread(_store
, "sync-trace"), store(_store
), manager(_manager
) {}
71 int process(const DoutPrefixProvider
*dpp
) override
;
74 int RGWSyncTraceServiceMapThread::process(const DoutPrefixProvider
*dpp
)
76 map
<string
, string
> status
;
77 status
["current_sync"] = manager
->get_active_names();
78 int ret
= store
->update_service_map(dpp
, std::move(status
));
80 ldout(store
->ctx(), 0) << "ERROR: update_service_map() returned ret=" << ret
<< dendl
;
85 RGWSyncTraceNodeRef
RGWSyncTraceManager::add_node(const RGWSyncTraceNodeRef
& parent
,
86 const std::string
& type
,
87 const std::string
& id
)
89 shunique_lock
wl(lock
, ceph::acquire_unique
);
90 auto handle
= alloc_handle();
91 RGWSyncTraceNodeRef
& ref
= nodes
[handle
];
92 ref
.reset(new RGWSyncTraceNode(cct
, handle
, parent
, type
, id
));
93 // return a separate shared_ptr that calls finish() on the node instead of
94 // deleting it. the lambda capture holds a reference to the original 'ref'
95 auto deleter
= [ref
, this] (RGWSyncTraceNode
*node
) { finish_node(node
); };
96 return {ref
.get(), deleter
};
99 bool RGWSyncTraceNode::match(const string
& search_term
, bool search_history
)
102 std::regex
expr(search_term
);
105 if (regex_search(prefix
, m
, expr
)) {
108 if (regex_search(status
, m
,expr
)) {
111 if (!search_history
) {
115 for (auto h
: history
) {
116 if (regex_search(h
, m
, expr
)) {
120 } catch (const std::regex_error
& e
) {
121 ldout(cct
, 5) << "NOTICE: sync trace: bad expression: bad regex search term" << dendl
;
127 void RGWSyncTraceManager::init(RGWRados
*store
)
129 service_map_thread
= new RGWSyncTraceServiceMapThread(store
, this);
130 service_map_thread
->start();
133 RGWSyncTraceManager::~RGWSyncTraceManager()
135 cct
->get_admin_socket()->unregister_commands(this);
136 service_map_thread
->stop();
137 delete service_map_thread
;
142 int RGWSyncTraceManager::hook_to_admin_command()
144 AdminSocket
*admin_socket
= cct
->get_admin_socket();
146 admin_commands
= { { "sync trace show name=search,type=CephString,req=false", "sync trace show [filter_str]: show current multisite tracing information" },
147 { "sync trace history name=search,type=CephString,req=false", "sync trace history [filter_str]: show history of multisite tracing information" },
148 { "sync trace active name=search,type=CephString,req=false", "show active multisite sync entities information" },
149 { "sync trace active_short name=search,type=CephString,req=false", "show active multisite sync entities entries" } };
150 for (auto cmd
: admin_commands
) {
151 int r
= admin_socket
->register_command(cmd
[0], this,
154 lderr(cct
) << "ERROR: fail to register admin socket command (r=" << r
<< ")" << dendl
;
161 static void dump_node(RGWSyncTraceNode
*entry
, bool show_history
, Formatter
*f
)
163 f
->open_object_section("entry");
164 ::encode_json("status", entry
->to_str(), f
);
166 f
->open_array_section("history");
167 for (auto h
: entry
->get_history()) {
168 ::encode_json("entry", h
, f
);
175 string
RGWSyncTraceManager::get_active_names()
177 shunique_lock
rl(lock
, ceph::acquire_shared
);
182 f
.open_array_section("result");
183 for (auto n
: nodes
) {
184 auto& entry
= n
.second
;
186 if (!entry
->test_flags(RGW_SNS_FLAG_ACTIVE
)) {
189 const string
& name
= entry
->get_resource_name();
191 ::encode_json("entry", name
, &f
);
201 int RGWSyncTraceManager::call(std::string_view command
, const cmdmap_t
& cmdmap
,
207 bool show_history
= (command
== "sync trace history");
208 bool show_short
= (command
== "sync trace active_short");
209 bool show_active
= (command
== "sync trace active") || show_short
;
213 auto si
= cmdmap
.find("search");
214 if (si
!= cmdmap
.end()) {
215 search
= boost::get
<string
>(si
->second
);
218 shunique_lock
rl(lock
, ceph::acquire_shared
);
220 f
->open_object_section("result");
221 f
->open_array_section("running");
222 for (auto n
: nodes
) {
223 auto& entry
= n
.second
;
225 if (!search
.empty() && !entry
->match(search
, show_history
)) {
228 if (show_active
&& !entry
->test_flags(RGW_SNS_FLAG_ACTIVE
)) {
232 const string
& name
= entry
->get_resource_name();
234 ::encode_json("entry", name
, f
);
237 dump_node(entry
.get(), show_history
, f
);
243 f
->open_array_section("complete");
244 for (auto& entry
: complete_nodes
) {
245 if (!search
.empty() && !entry
->match(search
, show_history
)) {
248 if (show_active
&& !entry
->test_flags(RGW_SNS_FLAG_ACTIVE
)) {
251 dump_node(entry
.get(), show_history
, f
);
261 void RGWSyncTraceManager::finish_node(RGWSyncTraceNode
*node
)
263 RGWSyncTraceNodeRef old_node
;
266 shunique_lock
wl(lock
, ceph::acquire_unique
);
270 auto iter
= nodes
.find(node
->handle
);
271 if (iter
== nodes
.end()) {
272 /* not found, already finished */
276 if (complete_nodes
.full()) {
277 /* take a reference to the entry that is going to be evicted,
278 * can't let it get evicted under lock held, otherwise
279 * it's a deadlock as it will call finish_node()
281 old_node
= complete_nodes
.front();
284 complete_nodes
.push_back(iter
->second
);