]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include "msg/Message.h" | |
16 | #include "DispatchQueue.h" | |
17 | #include "Messenger.h" | |
18 | #include "common/ceph_context.h" | |
19 | ||
20 | #define dout_subsys ceph_subsys_ms | |
21 | #include "common/debug.h" | |
22 | ||
23 | ||
24 | /******************* | |
25 | * DispatchQueue | |
26 | */ | |
27 | ||
28 | #undef dout_prefix | |
29 | #define dout_prefix *_dout << "-- " << msgr->get_myaddr() << " " | |
30 | ||
31 | double DispatchQueue::get_max_age(utime_t now) const { | |
32 | Mutex::Locker l(lock); | |
33 | if (marrival.empty()) | |
34 | return 0; | |
35 | else | |
36 | return (now - marrival.begin()->first); | |
37 | } | |
38 | ||
39 | uint64_t DispatchQueue::pre_dispatch(Message *m) | |
40 | { | |
41 | ldout(cct,1) << "<== " << m->get_source_inst() | |
42 | << " " << m->get_seq() | |
43 | << " ==== " << *m | |
44 | << " ==== " << m->get_payload().length() | |
45 | << "+" << m->get_middle().length() | |
46 | << "+" << m->get_data().length() | |
47 | << " (" << m->get_footer().front_crc << " " | |
48 | << m->get_footer().middle_crc | |
49 | << " " << m->get_footer().data_crc << ")" | |
50 | << " " << m << " con " << m->get_connection() | |
51 | << dendl; | |
52 | uint64_t msize = m->get_dispatch_throttle_size(); | |
53 | m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message. | |
54 | return msize; | |
55 | } | |
56 | ||
57 | void DispatchQueue::post_dispatch(Message *m, uint64_t msize) | |
58 | { | |
59 | dispatch_throttle_release(msize); | |
60 | ldout(cct,20) << "done calling dispatch on " << m << dendl; | |
61 | } | |
62 | ||
63 | bool DispatchQueue::can_fast_dispatch(const Message *m) const | |
64 | { | |
65 | return msgr->ms_can_fast_dispatch(m); | |
66 | } | |
67 | ||
68 | void DispatchQueue::fast_dispatch(Message *m) | |
69 | { | |
70 | uint64_t msize = pre_dispatch(m); | |
71 | msgr->ms_fast_dispatch(m); | |
72 | post_dispatch(m, msize); | |
73 | } | |
74 | ||
75 | void DispatchQueue::fast_preprocess(Message *m) | |
76 | { | |
77 | msgr->ms_fast_preprocess(m); | |
78 | } | |
79 | ||
80 | void DispatchQueue::enqueue(Message *m, int priority, uint64_t id) | |
81 | { | |
82 | ||
83 | Mutex::Locker l(lock); | |
84 | ldout(cct,20) << "queue " << m << " prio " << priority << dendl; | |
85 | add_arrival(m); | |
86 | if (priority >= CEPH_MSG_PRIO_LOW) { | |
87 | mqueue.enqueue_strict( | |
88 | id, priority, QueueItem(m)); | |
89 | } else { | |
90 | mqueue.enqueue( | |
91 | id, priority, m->get_cost(), QueueItem(m)); | |
92 | } | |
93 | cond.Signal(); | |
94 | } | |
95 | ||
96 | void DispatchQueue::local_delivery(Message *m, int priority) | |
97 | { | |
98 | m->set_recv_stamp(ceph_clock_now()); | |
99 | Mutex::Locker l(local_delivery_lock); | |
100 | if (local_messages.empty()) | |
101 | local_delivery_cond.Signal(); | |
102 | local_messages.push_back(make_pair(m, priority)); | |
103 | return; | |
104 | } | |
105 | ||
106 | void DispatchQueue::run_local_delivery() | |
107 | { | |
108 | local_delivery_lock.Lock(); | |
109 | while (true) { | |
110 | if (stop_local_delivery) | |
111 | break; | |
112 | if (local_messages.empty()) { | |
113 | local_delivery_cond.Wait(local_delivery_lock); | |
114 | continue; | |
115 | } | |
116 | pair<Message *, int> mp = local_messages.front(); | |
117 | local_messages.pop_front(); | |
118 | local_delivery_lock.Unlock(); | |
119 | Message *m = mp.first; | |
120 | int priority = mp.second; | |
121 | fast_preprocess(m); | |
122 | if (can_fast_dispatch(m)) { | |
123 | fast_dispatch(m); | |
124 | } else { | |
125 | enqueue(m, priority, 0); | |
126 | } | |
127 | local_delivery_lock.Lock(); | |
128 | } | |
129 | local_delivery_lock.Unlock(); | |
130 | } | |
131 | ||
132 | void DispatchQueue::dispatch_throttle_release(uint64_t msize) | |
133 | { | |
134 | if (msize) { | |
135 | ldout(cct,10) << __func__ << " " << msize << " to dispatch throttler " | |
136 | << dispatch_throttler.get_current() << "/" | |
137 | << dispatch_throttler.get_max() << dendl; | |
138 | dispatch_throttler.put(msize); | |
139 | } | |
140 | } | |
141 | ||
142 | /* | |
143 | * This function delivers incoming messages to the Messenger. | |
144 | * Connections with messages are kept in queues; when beginning a message | |
145 | * delivery the highest-priority queue is selected, the connection from the | |
146 | * front of the queue is removed, and its message read. If the connection | |
147 | * has remaining messages at that priority level, it is re-placed on to the | |
148 | * end of the queue. If the queue is empty; it's removed. | |
149 | * The message is then delivered and the process starts again. | |
150 | */ | |
151 | void DispatchQueue::entry() | |
152 | { | |
153 | lock.Lock(); | |
154 | while (true) { | |
155 | while (!mqueue.empty()) { | |
156 | QueueItem qitem = mqueue.dequeue(); | |
157 | if (!qitem.is_code()) | |
158 | remove_arrival(qitem.get_message()); | |
159 | lock.Unlock(); | |
160 | ||
161 | if (qitem.is_code()) { | |
162 | if (cct->_conf->ms_inject_internal_delays && | |
163 | cct->_conf->ms_inject_delay_probability && | |
164 | (rand() % 10000)/10000.0 < cct->_conf->ms_inject_delay_probability) { | |
165 | utime_t t; | |
166 | t.set_from_double(cct->_conf->ms_inject_internal_delays); | |
167 | ldout(cct, 1) << "DispatchQueue::entry inject delay of " << t | |
168 | << dendl; | |
169 | t.sleep(); | |
170 | } | |
171 | switch (qitem.get_code()) { | |
172 | case D_BAD_REMOTE_RESET: | |
173 | msgr->ms_deliver_handle_remote_reset(qitem.get_connection()); | |
174 | break; | |
175 | case D_CONNECT: | |
176 | msgr->ms_deliver_handle_connect(qitem.get_connection()); | |
177 | break; | |
178 | case D_ACCEPT: | |
179 | msgr->ms_deliver_handle_accept(qitem.get_connection()); | |
180 | break; | |
181 | case D_BAD_RESET: | |
182 | msgr->ms_deliver_handle_reset(qitem.get_connection()); | |
183 | break; | |
184 | case D_CONN_REFUSED: | |
185 | msgr->ms_deliver_handle_refused(qitem.get_connection()); | |
186 | break; | |
187 | default: | |
188 | ceph_abort(); | |
189 | } | |
190 | } else { | |
191 | Message *m = qitem.get_message(); | |
192 | if (stop) { | |
193 | ldout(cct,10) << " stop flag set, discarding " << m << " " << *m << dendl; | |
194 | m->put(); | |
195 | } else { | |
196 | uint64_t msize = pre_dispatch(m); | |
197 | msgr->ms_deliver_dispatch(m); | |
198 | post_dispatch(m, msize); | |
199 | } | |
200 | } | |
201 | ||
202 | lock.Lock(); | |
203 | } | |
204 | if (stop) | |
205 | break; | |
206 | ||
207 | // wait for something to be put on queue | |
208 | cond.Wait(lock); | |
209 | } | |
210 | lock.Unlock(); | |
211 | } | |
212 | ||
213 | void DispatchQueue::discard_queue(uint64_t id) { | |
214 | Mutex::Locker l(lock); | |
215 | list<QueueItem> removed; | |
216 | mqueue.remove_by_class(id, &removed); | |
217 | for (list<QueueItem>::iterator i = removed.begin(); | |
218 | i != removed.end(); | |
219 | ++i) { | |
220 | assert(!(i->is_code())); // We don't discard id 0, ever! | |
221 | Message *m = i->get_message(); | |
222 | remove_arrival(m); | |
223 | dispatch_throttle_release(m->get_dispatch_throttle_size()); | |
224 | m->put(); | |
225 | } | |
226 | } | |
227 | ||
228 | void DispatchQueue::start() | |
229 | { | |
230 | assert(!stop); | |
231 | assert(!dispatch_thread.is_started()); | |
232 | dispatch_thread.create("ms_dispatch"); | |
233 | local_delivery_thread.create("ms_local"); | |
234 | } | |
235 | ||
236 | void DispatchQueue::wait() | |
237 | { | |
238 | local_delivery_thread.join(); | |
239 | dispatch_thread.join(); | |
240 | } | |
241 | ||
242 | void DispatchQueue::discard_local() | |
243 | { | |
244 | for (list<pair<Message *, int> >::iterator p = local_messages.begin(); | |
245 | p != local_messages.end(); | |
246 | ++p) { | |
247 | ldout(cct,20) << __func__ << " " << p->first << dendl; | |
248 | p->first->put(); | |
249 | } | |
250 | local_messages.clear(); | |
251 | } | |
252 | ||
253 | void DispatchQueue::shutdown() | |
254 | { | |
255 | // stop my local delivery thread | |
256 | local_delivery_lock.Lock(); | |
257 | stop_local_delivery = true; | |
258 | local_delivery_cond.Signal(); | |
259 | local_delivery_lock.Unlock(); | |
260 | ||
261 | // stop my dispatch thread | |
262 | lock.Lock(); | |
263 | stop = true; | |
264 | cond.Signal(); | |
265 | lock.Unlock(); | |
266 | } |