]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/OpRequest.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / osd / OpRequest.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2
3 #include "OpRequest.h"
4 #include "common/Formatter.h"
5 #include <iostream>
6 #include <vector>
7 #include "common/debug.h"
8 #include "common/config.h"
9 #include "msg/Message.h"
10 #include "messages/MOSDOp.h"
11 #include "messages/MOSDSubOp.h"
12 #include "messages/MOSDRepOp.h"
13 #include "include/assert.h"
14 #include "osd/osd_types.h"
15
16 #ifdef WITH_LTTNG
17 #define TRACEPOINT_DEFINE
18 #define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
19 #include "tracing/oprequest.h"
20 #undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
21 #undef TRACEPOINT_DEFINE
22 #else
23 #define tracepoint(...)
24 #endif
25
26 OpRequest::OpRequest(Message *req, OpTracker *tracker) :
27 TrackedOp(tracker, req->get_recv_stamp()),
28 rmw_flags(0), request(req),
29 hit_flag_points(0), latest_flag_point(0),
30 hitset_inserted(false) {
31 if (req->get_priority() < tracker->cct->_conf->osd_client_op_priority) {
32 // don't warn as quickly for low priority ops
33 warn_interval_multiplier = tracker->cct->_conf->osd_recovery_op_warn_multiple;
34 }
35 if (req->get_type() == CEPH_MSG_OSD_OP) {
36 reqid = static_cast<MOSDOp*>(req)->get_reqid();
37 } else if (req->get_type() == MSG_OSD_SUBOP) {
38 reqid = static_cast<MOSDSubOp*>(req)->reqid;
39 } else if (req->get_type() == MSG_OSD_REPOP) {
40 reqid = static_cast<MOSDRepOp*>(req)->reqid;
41 }
42 mark_event("header_read", request->get_recv_stamp());
43 mark_event("throttled", request->get_throttle_stamp());
44 mark_event("all_read", request->get_recv_complete_stamp());
45 mark_event("dispatched", request->get_dispatch_stamp());
46 }
47
48 void OpRequest::_dump(Formatter *f) const
49 {
50 Message *m = request;
51 f->dump_string("flag_point", state_string());
52 if (m->get_orig_source().is_client()) {
53 f->open_object_section("client_info");
54 stringstream client_name, client_addr;
55 client_name << m->get_orig_source();
56 client_addr << m->get_orig_source_addr();
57 f->dump_string("client", client_name.str());
58 f->dump_string("client_addr", client_addr.str());
59 f->dump_unsigned("tid", m->get_tid());
60 f->close_section(); // client_info
61 }
62 {
63 f->open_array_section("events");
64 Mutex::Locker l(lock);
65 for (auto& i : events) {
66 f->dump_object("event", i);
67 }
68 f->close_section();
69 }
70 }
71
72 void OpRequest::_dump_op_descriptor_unlocked(ostream& stream) const
73 {
74 get_req()->print(stream);
75 }
76
77 void OpRequest::_unregistered() {
78 request->clear_data();
79 request->clear_payload();
80 request->release_message_throttle();
81 request->set_connection(nullptr);
82 }
83
84 bool OpRequest::check_rmw(int flag) {
85 assert(rmw_flags != 0);
86 return rmw_flags & flag;
87 }
88 bool OpRequest::may_read() {
89 return need_read_cap() || check_rmw(CEPH_OSD_RMW_FLAG_CLASS_READ);
90 }
91 bool OpRequest::may_write() {
92 return need_write_cap() || check_rmw(CEPH_OSD_RMW_FLAG_CLASS_WRITE);
93 }
94 bool OpRequest::may_cache() { return check_rmw(CEPH_OSD_RMW_FLAG_CACHE); }
95 bool OpRequest::rwordered_forced() {
96 return check_rmw(CEPH_OSD_RMW_FLAG_RWORDERED);
97 }
98 bool OpRequest::rwordered() {
99 return may_write() || may_cache() || rwordered_forced();
100 }
101
102 bool OpRequest::includes_pg_op() { return check_rmw(CEPH_OSD_RMW_FLAG_PGOP); }
103 bool OpRequest::need_read_cap() {
104 return check_rmw(CEPH_OSD_RMW_FLAG_READ);
105 }
106 bool OpRequest::need_write_cap() {
107 return check_rmw(CEPH_OSD_RMW_FLAG_WRITE);
108 }
109 bool OpRequest::need_promote() {
110 return check_rmw(CEPH_OSD_RMW_FLAG_FORCE_PROMOTE);
111 }
112 bool OpRequest::need_skip_handle_cache() {
113 return check_rmw(CEPH_OSD_RMW_FLAG_SKIP_HANDLE_CACHE);
114 }
115 bool OpRequest::need_skip_promote() {
116 return check_rmw(CEPH_OSD_RMW_FLAG_SKIP_PROMOTE);
117 }
118
119 void OpRequest::set_rmw_flags(int flags) {
120 #ifdef WITH_LTTNG
121 int old_rmw_flags = rmw_flags;
122 #endif
123 rmw_flags |= flags;
124 tracepoint(oprequest, set_rmw_flags, reqid.name._type,
125 reqid.name._num, reqid.tid, reqid.inc,
126 flags, old_rmw_flags, rmw_flags);
127 }
128
129 void OpRequest::set_read() { set_rmw_flags(CEPH_OSD_RMW_FLAG_READ); }
130 void OpRequest::set_write() { set_rmw_flags(CEPH_OSD_RMW_FLAG_WRITE); }
131 void OpRequest::set_class_read() { set_rmw_flags(CEPH_OSD_RMW_FLAG_CLASS_READ); }
132 void OpRequest::set_class_write() { set_rmw_flags(CEPH_OSD_RMW_FLAG_CLASS_WRITE); }
133 void OpRequest::set_pg_op() { set_rmw_flags(CEPH_OSD_RMW_FLAG_PGOP); }
134 void OpRequest::set_cache() { set_rmw_flags(CEPH_OSD_RMW_FLAG_CACHE); }
135 void OpRequest::set_promote() { set_rmw_flags(CEPH_OSD_RMW_FLAG_FORCE_PROMOTE); }
136 void OpRequest::set_skip_handle_cache() { set_rmw_flags(CEPH_OSD_RMW_FLAG_SKIP_HANDLE_CACHE); }
137 void OpRequest::set_skip_promote() { set_rmw_flags(CEPH_OSD_RMW_FLAG_SKIP_PROMOTE); }
138 void OpRequest::set_force_rwordered() { set_rmw_flags(CEPH_OSD_RMW_FLAG_RWORDERED); }
139
140 void OpRequest::mark_flag_point(uint8_t flag, const char *s) {
141 #ifdef WITH_LTTNG
142 uint8_t old_flags = hit_flag_points;
143 #endif
144 mark_event(s);
145 hit_flag_points |= flag;
146 latest_flag_point = flag;
147 tracepoint(oprequest, mark_flag_point, reqid.name._type,
148 reqid.name._num, reqid.tid, reqid.inc, rmw_flags,
149 flag, s, old_flags, hit_flag_points);
150 }
151
152 void OpRequest::mark_flag_point_string(uint8_t flag, const string& s) {
153 #ifdef WITH_LTTNG
154 uint8_t old_flags = hit_flag_points;
155 #endif
156 mark_event_string(s);
157 hit_flag_points |= flag;
158 latest_flag_point = flag;
159 tracepoint(oprequest, mark_flag_point, reqid.name._type,
160 reqid.name._num, reqid.tid, reqid.inc, rmw_flags,
161 flag, s.c_str(), old_flags, hit_flag_points);
162 }
163
164 ostream& operator<<(ostream& out, const OpRequest::ClassInfo& i)
165 {
166 out << "class " << i.name << " rd " << i.read
167 << " wr " << i.write << " wl " << i.whitelisted;
168 return out;
169 }