1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "msg/Message.h"
16 #include "DispatchQueue.h"
17 #include "Messenger.h"
18 #include "common/ceph_context.h"
20 #define dout_subsys ceph_subsys_ms
21 #include "common/debug.h"
29 #define dout_prefix *_dout << "-- " << msgr->get_myaddrs() << " "
31 double DispatchQueue::get_max_age(utime_t now
) const {
32 std::lock_guard l
{lock
};
36 return (now
- marrival
.begin()->first
);
39 uint64_t DispatchQueue::pre_dispatch(const ref_t
<Message
>& m
)
41 ldout(cct
,1) << "<== " << m
->get_source_inst()
42 << " " << m
->get_seq()
44 << " ==== " << m
->get_payload().length()
45 << "+" << m
->get_middle().length()
46 << "+" << m
->get_data().length()
47 << " (" << ceph_con_mode_name(m
->get_connection()->get_con_mode())
48 << " " << m
->get_footer().front_crc
<< " "
49 << m
->get_footer().middle_crc
50 << " " << m
->get_footer().data_crc
<< ")"
51 << " " << m
<< " con " << m
->get_connection()
53 uint64_t msize
= m
->get_dispatch_throttle_size();
54 m
->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message.
58 void DispatchQueue::post_dispatch(const ref_t
<Message
>& m
, uint64_t msize
)
60 dispatch_throttle_release(msize
);
61 ldout(cct
,20) << "done calling dispatch on " << m
<< dendl
;
64 bool DispatchQueue::can_fast_dispatch(const cref_t
<Message
> &m
) const
66 return msgr
->ms_can_fast_dispatch(m
);
69 void DispatchQueue::fast_dispatch(const ref_t
<Message
>& m
)
71 uint64_t msize
= pre_dispatch(m
);
72 msgr
->ms_fast_dispatch(m
);
73 post_dispatch(m
, msize
);
76 void DispatchQueue::fast_preprocess(const ref_t
<Message
>& m
)
78 msgr
->ms_fast_preprocess(m
);
81 void DispatchQueue::enqueue(const ref_t
<Message
>& m
, int priority
, uint64_t id
)
83 std::lock_guard l
{lock
};
87 ldout(cct
,20) << "queue " << m
<< " prio " << priority
<< dendl
;
89 if (priority
>= CEPH_MSG_PRIO_LOW
) {
90 mqueue
.enqueue_strict(id
, priority
, QueueItem(m
));
92 mqueue
.enqueue(id
, priority
, m
->get_cost(), QueueItem(m
));
97 void DispatchQueue::local_delivery(const ref_t
<Message
>& m
, int priority
)
99 auto local_delivery_stamp
= ceph_clock_now();
100 m
->set_recv_stamp(local_delivery_stamp
);
101 m
->set_throttle_stamp(local_delivery_stamp
);
102 m
->set_recv_complete_stamp(local_delivery_stamp
);
103 std::lock_guard l
{local_delivery_lock
};
104 if (local_messages
.empty())
105 local_delivery_cond
.notify_all();
106 local_messages
.emplace(m
, priority
);
110 void DispatchQueue::run_local_delivery()
112 std::unique_lock l
{local_delivery_lock
};
114 if (stop_local_delivery
)
116 if (local_messages
.empty()) {
117 local_delivery_cond
.wait(l
);
120 auto p
= std::move(local_messages
.front());
121 local_messages
.pop();
123 const ref_t
<Message
>& m
= p
.first
;
124 int priority
= p
.second
;
126 if (can_fast_dispatch(m
)) {
129 enqueue(m
, priority
, 0);
135 void DispatchQueue::dispatch_throttle_release(uint64_t msize
)
138 ldout(cct
,10) << __func__
<< " " << msize
<< " to dispatch throttler "
139 << dispatch_throttler
.get_current() << "/"
140 << dispatch_throttler
.get_max() << dendl
;
141 dispatch_throttler
.put(msize
);
146 * This function delivers incoming messages to the Messenger.
147 * Connections with messages are kept in queues; when beginning a message
148 * delivery the highest-priority queue is selected, the connection from the
149 * front of the queue is removed, and its message read. If the connection
150 * has remaining messages at that priority level, it is re-placed on to the
151 * end of the queue. If the queue is empty; it's removed.
152 * The message is then delivered and the process starts again.
154 void DispatchQueue::entry()
156 std::unique_lock l
{lock
};
158 while (!mqueue
.empty()) {
159 QueueItem qitem
= mqueue
.dequeue();
160 if (!qitem
.is_code())
161 remove_arrival(qitem
.get_message());
164 if (qitem
.is_code()) {
165 if (cct
->_conf
->ms_inject_internal_delays
&&
166 cct
->_conf
->ms_inject_delay_probability
&&
167 (rand() % 10000)/10000.0 < cct
->_conf
->ms_inject_delay_probability
) {
169 t
.set_from_double(cct
->_conf
->ms_inject_internal_delays
);
170 ldout(cct
, 1) << "DispatchQueue::entry inject delay of " << t
174 switch (qitem
.get_code()) {
175 case D_BAD_REMOTE_RESET
:
176 msgr
->ms_deliver_handle_remote_reset(qitem
.get_connection());
179 msgr
->ms_deliver_handle_connect(qitem
.get_connection());
182 msgr
->ms_deliver_handle_accept(qitem
.get_connection());
185 msgr
->ms_deliver_handle_reset(qitem
.get_connection());
188 msgr
->ms_deliver_handle_refused(qitem
.get_connection());
194 const ref_t
<Message
>& m
= qitem
.get_message();
196 ldout(cct
,10) << " stop flag set, discarding " << m
<< " " << *m
<< dendl
;
198 uint64_t msize
= pre_dispatch(m
);
199 msgr
->ms_deliver_dispatch(m
);
200 post_dispatch(m
, msize
);
209 // wait for something to be put on queue
214 void DispatchQueue::discard_queue(uint64_t id
) {
215 std::lock_guard l
{lock
};
216 list
<QueueItem
> removed
;
217 mqueue
.remove_by_class(id
, &removed
);
218 for (list
<QueueItem
>::iterator i
= removed
.begin();
221 ceph_assert(!(i
->is_code())); // We don't discard id 0, ever!
222 const ref_t
<Message
>& m
= i
->get_message();
224 dispatch_throttle_release(m
->get_dispatch_throttle_size());
228 void DispatchQueue::start()
231 ceph_assert(!dispatch_thread
.is_started());
232 dispatch_thread
.create("ms_dispatch");
233 local_delivery_thread
.create("ms_local");
236 void DispatchQueue::wait()
238 local_delivery_thread
.join();
239 dispatch_thread
.join();
242 void DispatchQueue::discard_local()
244 decltype(local_messages
)().swap(local_messages
);
247 void DispatchQueue::shutdown()
249 // stop my local delivery thread
251 std::scoped_lock l
{local_delivery_lock
};
252 stop_local_delivery
= true;
253 local_delivery_cond
.notify_all();
255 // stop my dispatch thread
257 std::scoped_lock l
{lock
};