]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/DispatchQueue.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / msg / DispatchQueue.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "msg/Message.h"
16 #include "DispatchQueue.h"
17 #include "Messenger.h"
18 #include "common/ceph_context.h"
19
20 #define dout_subsys ceph_subsys_ms
21 #include "common/debug.h"
22
23
24 /*******************
25 * DispatchQueue
26 */
27
28 #undef dout_prefix
29 #define dout_prefix *_dout << "-- " << msgr->get_myaddrs() << " "
30
31 double DispatchQueue::get_max_age(utime_t now) const {
32 std::lock_guard l{lock};
33 if (marrival.empty())
34 return 0;
35 else
36 return (now - marrival.begin()->first);
37 }
38
39 uint64_t DispatchQueue::pre_dispatch(const ref_t<Message>& m)
40 {
41 ldout(cct,1) << "<== " << m->get_source_inst()
42 << " " << m->get_seq()
43 << " ==== " << *m
44 << " ==== " << m->get_payload().length()
45 << "+" << m->get_middle().length()
46 << "+" << m->get_data().length()
47 << " (" << ceph_con_mode_name(m->get_connection()->get_con_mode())
48 << " " << m->get_footer().front_crc << " "
49 << m->get_footer().middle_crc
50 << " " << m->get_footer().data_crc << ")"
51 << " " << m << " con " << m->get_connection()
52 << dendl;
53 uint64_t msize = m->get_dispatch_throttle_size();
54 m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message.
55 return msize;
56 }
57
58 void DispatchQueue::post_dispatch(const ref_t<Message>& m, uint64_t msize)
59 {
60 dispatch_throttle_release(msize);
61 ldout(cct,20) << "done calling dispatch on " << m << dendl;
62 }
63
64 bool DispatchQueue::can_fast_dispatch(const cref_t<Message> &m) const
65 {
66 return msgr->ms_can_fast_dispatch(m);
67 }
68
69 void DispatchQueue::fast_dispatch(const ref_t<Message>& m)
70 {
71 uint64_t msize = pre_dispatch(m);
72 msgr->ms_fast_dispatch(m);
73 post_dispatch(m, msize);
74 }
75
76 void DispatchQueue::fast_preprocess(const ref_t<Message>& m)
77 {
78 msgr->ms_fast_preprocess(m);
79 }
80
81 void DispatchQueue::enqueue(const ref_t<Message>& m, int priority, uint64_t id)
82 {
83 std::lock_guard l{lock};
84 if (stop) {
85 return;
86 }
87 ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
88 add_arrival(m);
89 if (priority >= CEPH_MSG_PRIO_LOW) {
90 mqueue.enqueue_strict(id, priority, QueueItem(m));
91 } else {
92 mqueue.enqueue(id, priority, m->get_cost(), QueueItem(m));
93 }
94 cond.notify_all();
95 }
96
97 void DispatchQueue::local_delivery(const ref_t<Message>& m, int priority)
98 {
99 auto local_delivery_stamp = ceph_clock_now();
100 m->set_recv_stamp(local_delivery_stamp);
101 m->set_throttle_stamp(local_delivery_stamp);
102 m->set_recv_complete_stamp(local_delivery_stamp);
103 std::lock_guard l{local_delivery_lock};
104 if (local_messages.empty())
105 local_delivery_cond.notify_all();
106 local_messages.emplace(m, priority);
107 return;
108 }
109
110 void DispatchQueue::run_local_delivery()
111 {
112 std::unique_lock l{local_delivery_lock};
113 while (true) {
114 if (stop_local_delivery)
115 break;
116 if (local_messages.empty()) {
117 local_delivery_cond.wait(l);
118 continue;
119 }
120 auto p = std::move(local_messages.front());
121 local_messages.pop();
122 l.unlock();
123 const ref_t<Message>& m = p.first;
124 int priority = p.second;
125 fast_preprocess(m);
126 if (can_fast_dispatch(m)) {
127 fast_dispatch(m);
128 } else {
129 enqueue(m, priority, 0);
130 }
131 l.lock();
132 }
133 }
134
135 void DispatchQueue::dispatch_throttle_release(uint64_t msize)
136 {
137 if (msize) {
138 ldout(cct,10) << __func__ << " " << msize << " to dispatch throttler "
139 << dispatch_throttler.get_current() << "/"
140 << dispatch_throttler.get_max() << dendl;
141 dispatch_throttler.put(msize);
142 }
143 }
144
145 /*
146 * This function delivers incoming messages to the Messenger.
147 * Connections with messages are kept in queues; when beginning a message
148 * delivery the highest-priority queue is selected, the connection from the
149 * front of the queue is removed, and its message read. If the connection
150 * has remaining messages at that priority level, it is re-placed on to the
151 * end of the queue. If the queue is empty; it's removed.
152 * The message is then delivered and the process starts again.
153 */
154 void DispatchQueue::entry()
155 {
156 std::unique_lock l{lock};
157 while (true) {
158 while (!mqueue.empty()) {
159 QueueItem qitem = mqueue.dequeue();
160 if (!qitem.is_code())
161 remove_arrival(qitem.get_message());
162 l.unlock();
163
164 if (qitem.is_code()) {
165 if (cct->_conf->ms_inject_internal_delays &&
166 cct->_conf->ms_inject_delay_probability &&
167 (rand() % 10000)/10000.0 < cct->_conf->ms_inject_delay_probability) {
168 utime_t t;
169 t.set_from_double(cct->_conf->ms_inject_internal_delays);
170 ldout(cct, 1) << "DispatchQueue::entry inject delay of " << t
171 << dendl;
172 t.sleep();
173 }
174 switch (qitem.get_code()) {
175 case D_BAD_REMOTE_RESET:
176 msgr->ms_deliver_handle_remote_reset(qitem.get_connection());
177 break;
178 case D_CONNECT:
179 msgr->ms_deliver_handle_connect(qitem.get_connection());
180 break;
181 case D_ACCEPT:
182 msgr->ms_deliver_handle_accept(qitem.get_connection());
183 break;
184 case D_BAD_RESET:
185 msgr->ms_deliver_handle_reset(qitem.get_connection());
186 break;
187 case D_CONN_REFUSED:
188 msgr->ms_deliver_handle_refused(qitem.get_connection());
189 break;
190 default:
191 ceph_abort();
192 }
193 } else {
194 const ref_t<Message>& m = qitem.get_message();
195 if (stop) {
196 ldout(cct,10) << " stop flag set, discarding " << m << " " << *m << dendl;
197 } else {
198 uint64_t msize = pre_dispatch(m);
199 msgr->ms_deliver_dispatch(m);
200 post_dispatch(m, msize);
201 }
202 }
203
204 l.lock();
205 }
206 if (stop)
207 break;
208
209 // wait for something to be put on queue
210 cond.wait(l);
211 }
212 }
213
214 void DispatchQueue::discard_queue(uint64_t id) {
215 std::lock_guard l{lock};
216 list<QueueItem> removed;
217 mqueue.remove_by_class(id, &removed);
218 for (list<QueueItem>::iterator i = removed.begin();
219 i != removed.end();
220 ++i) {
221 ceph_assert(!(i->is_code())); // We don't discard id 0, ever!
222 const ref_t<Message>& m = i->get_message();
223 remove_arrival(m);
224 dispatch_throttle_release(m->get_dispatch_throttle_size());
225 }
226 }
227
228 void DispatchQueue::start()
229 {
230 ceph_assert(!stop);
231 ceph_assert(!dispatch_thread.is_started());
232 dispatch_thread.create("ms_dispatch");
233 local_delivery_thread.create("ms_local");
234 }
235
236 void DispatchQueue::wait()
237 {
238 local_delivery_thread.join();
239 dispatch_thread.join();
240 }
241
242 void DispatchQueue::discard_local()
243 {
244 decltype(local_messages)().swap(local_messages);
245 }
246
247 void DispatchQueue::shutdown()
248 {
249 // stop my local delivery thread
250 {
251 std::scoped_lock l{local_delivery_lock};
252 stop_local_delivery = true;
253 local_delivery_cond.notify_all();
254 }
255 // stop my dispatch thread
256 {
257 std::scoped_lock l{lock};
258 stop = true;
259 cond.notify_all();
260 }
261 }