1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_DISPATCHQUEUE_H
16 #define CEPH_DISPATCHQUEUE_H
20 #include <boost/intrusive_ptr.hpp>
21 #include "include/assert.h"
22 #include "include/xlist.h"
23 #include "common/Mutex.h"
24 #include "common/Cond.h"
25 #include "common/Thread.h"
26 #include "common/PrioritizedQueue.h"
34 * The DispatchQueue contains all the connections which have Messages
35 * they want to be dispatched, carefully organized by Message priority
36 * and permitted to deliver in a round-robin fashion.
37 * See Messenger::dispatch_entry for details.
45 explicit QueueItem(Message
*m
) : type(-1), con(0), m(m
) {}
46 QueueItem(int type
, Connection
*con
) : type(type
), con(con
), m(0) {}
47 bool is_code() const {
50 int get_code () const {
54 Message
*get_message() {
58 Connection
*get_connection() {
69 PrioritizedQueue
<QueueItem
, uint64_t> mqueue
;
71 set
<pair
<double, Message
*> > marrival
;
72 map
<Message
*, set
<pair
<double, Message
*> >::iterator
> marrival_map
;
73 void add_arrival(Message
*m
) {
77 marrival
.insert(make_pair(m
->get_recv_stamp(), m
)).first
81 void remove_arrival(Message
*m
) {
82 map
<Message
*, set
<pair
<double, Message
*> >::iterator
>::iterator i
=
84 assert(i
!= marrival_map
.end());
85 marrival
.erase(i
->second
);
86 marrival_map
.erase(i
);
89 std::atomic
<uint64_t> next_id
;
91 enum { D_CONNECT
= 1, D_ACCEPT
, D_BAD_REMOTE_RESET
, D_BAD_RESET
, D_CONN_REFUSED
, D_NUM_CODES
};
94 * The DispatchThread runs dispatch_entry to empty out the dispatch_queue.
96 class DispatchThread
: public Thread
{
99 explicit DispatchThread(DispatchQueue
*dq
) : dq(dq
) {}
100 void *entry() override
{
106 Mutex local_delivery_lock
;
107 Cond local_delivery_cond
;
108 bool stop_local_delivery
;
109 list
<pair
<Message
*, int> > local_messages
;
110 class LocalDeliveryThread
: public Thread
{
113 explicit LocalDeliveryThread(DispatchQueue
*dq
) : dq(dq
) {}
114 void *entry() override
{
115 dq
->run_local_delivery();
118 } local_delivery_thread
;
120 uint64_t pre_dispatch(Message
*m
);
121 void post_dispatch(Message
*m
, uint64_t msize
);
125 /// Throttle preventing us from building up a big backlog waiting for dispatch
126 Throttle dispatch_throttler
;
129 void local_delivery(Message
*m
, int priority
);
130 void run_local_delivery();
132 double get_max_age(utime_t now
) const;
134 int get_queue_len() const {
135 Mutex::Locker
l(lock
);
136 return mqueue
.length();
140 * Release memory accounting back to the dispatch throttler.
142 * @param msize The amount of memory to release.
144 void dispatch_throttle_release(uint64_t msize
);
146 void queue_connect(Connection
*con
) {
147 Mutex::Locker
l(lock
);
150 mqueue
.enqueue_strict(
152 CEPH_MSG_PRIO_HIGHEST
,
153 QueueItem(D_CONNECT
, con
));
156 void queue_accept(Connection
*con
) {
157 Mutex::Locker
l(lock
);
160 mqueue
.enqueue_strict(
162 CEPH_MSG_PRIO_HIGHEST
,
163 QueueItem(D_ACCEPT
, con
));
166 void queue_remote_reset(Connection
*con
) {
167 Mutex::Locker
l(lock
);
170 mqueue
.enqueue_strict(
172 CEPH_MSG_PRIO_HIGHEST
,
173 QueueItem(D_BAD_REMOTE_RESET
, con
));
176 void queue_reset(Connection
*con
) {
177 Mutex::Locker
l(lock
);
180 mqueue
.enqueue_strict(
182 CEPH_MSG_PRIO_HIGHEST
,
183 QueueItem(D_BAD_RESET
, con
));
186 void queue_refused(Connection
*con
) {
187 Mutex::Locker
l(lock
);
190 mqueue
.enqueue_strict(
192 CEPH_MSG_PRIO_HIGHEST
,
193 QueueItem(D_CONN_REFUSED
, con
));
197 bool can_fast_dispatch(const Message
*m
) const;
198 void fast_dispatch(Message
*m
);
199 void fast_preprocess(Message
*m
);
200 void enqueue(Message
*m
, int priority
, uint64_t id
);
201 void discard_queue(uint64_t id
);
202 void discard_local();
210 bool is_started() const {return dispatch_thread
.is_started();}
212 DispatchQueue(CephContext
*cct
, Messenger
*msgr
, string
&name
)
213 : cct(cct
), msgr(msgr
),
214 lock("Messenger::DispatchQueue::lock" + name
),
215 mqueue(cct
->_conf
->ms_pq_max_tokens_per_priority
,
216 cct
->_conf
->ms_pq_min_cost
),
218 dispatch_thread(this),
219 local_delivery_lock("Messenger::DispatchQueue::local_delivery_lock" + name
),
220 stop_local_delivery(false),
221 local_delivery_thread(this),
222 dispatch_throttler(cct
, string("msgr_dispatch_throttler-") + name
,
223 cct
->_conf
->ms_dispatch_throttle_bytes
),
227 assert(mqueue
.empty());
228 assert(marrival
.empty());
229 assert(local_messages
.empty());