]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef CEPH_DISPATCHQUEUE_H | |
16 | #define CEPH_DISPATCHQUEUE_H | |
17 | ||
18 | #include <atomic> | |
19 | #include <map> | |
20 | #include <boost/intrusive_ptr.hpp> | |
21 | #include "include/assert.h" | |
22 | #include "include/xlist.h" | |
7c673cae FG |
23 | #include "common/Mutex.h" |
24 | #include "common/Cond.h" | |
25 | #include "common/Thread.h" | |
26 | #include "common/PrioritizedQueue.h" | |
27 | ||
28 | class CephContext; | |
29 | class Messenger; | |
30 | class Message; | |
31 | struct Connection; | |
32 | ||
33 | /** | |
34 | * The DispatchQueue contains all the connections which have Messages | |
35 | * they want to be dispatched, carefully organized by Message priority | |
36 | * and permitted to deliver in a round-robin fashion. | |
37 | * See Messenger::dispatch_entry for details. | |
38 | */ | |
39 | class DispatchQueue { | |
40 | class QueueItem { | |
41 | int type; | |
42 | ConnectionRef con; | |
43 | MessageRef m; | |
44 | public: | |
45 | explicit QueueItem(Message *m) : type(-1), con(0), m(m) {} | |
46 | QueueItem(int type, Connection *con) : type(type), con(con), m(0) {} | |
47 | bool is_code() const { | |
48 | return type != -1; | |
49 | } | |
50 | int get_code () const { | |
51 | assert(is_code()); | |
52 | return type; | |
53 | } | |
54 | Message *get_message() { | |
55 | assert(!is_code()); | |
56 | return m.get(); | |
57 | } | |
58 | Connection *get_connection() { | |
59 | assert(is_code()); | |
60 | return con.get(); | |
61 | } | |
62 | }; | |
63 | ||
64 | CephContext *cct; | |
65 | Messenger *msgr; | |
66 | mutable Mutex lock; | |
67 | Cond cond; | |
68 | ||
69 | PrioritizedQueue<QueueItem, uint64_t> mqueue; | |
70 | ||
71 | set<pair<double, Message*> > marrival; | |
72 | map<Message *, set<pair<double, Message*> >::iterator> marrival_map; | |
73 | void add_arrival(Message *m) { | |
74 | marrival_map.insert( | |
75 | make_pair( | |
76 | m, | |
77 | marrival.insert(make_pair(m->get_recv_stamp(), m)).first | |
78 | ) | |
79 | ); | |
80 | } | |
81 | void remove_arrival(Message *m) { | |
82 | map<Message *, set<pair<double, Message*> >::iterator>::iterator i = | |
83 | marrival_map.find(m); | |
84 | assert(i != marrival_map.end()); | |
85 | marrival.erase(i->second); | |
86 | marrival_map.erase(i); | |
87 | } | |
88 | ||
89 | std::atomic<uint64_t> next_id; | |
90 | ||
91 | enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_CONN_REFUSED, D_NUM_CODES }; | |
92 | ||
93 | /** | |
94 | * The DispatchThread runs dispatch_entry to empty out the dispatch_queue. | |
95 | */ | |
96 | class DispatchThread : public Thread { | |
97 | DispatchQueue *dq; | |
98 | public: | |
99 | explicit DispatchThread(DispatchQueue *dq) : dq(dq) {} | |
100 | void *entry() override { | |
101 | dq->entry(); | |
102 | return 0; | |
103 | } | |
104 | } dispatch_thread; | |
105 | ||
106 | Mutex local_delivery_lock; | |
107 | Cond local_delivery_cond; | |
108 | bool stop_local_delivery; | |
109 | list<pair<Message *, int> > local_messages; | |
110 | class LocalDeliveryThread : public Thread { | |
111 | DispatchQueue *dq; | |
112 | public: | |
113 | explicit LocalDeliveryThread(DispatchQueue *dq) : dq(dq) {} | |
114 | void *entry() override { | |
115 | dq->run_local_delivery(); | |
116 | return 0; | |
117 | } | |
118 | } local_delivery_thread; | |
119 | ||
120 | uint64_t pre_dispatch(Message *m); | |
121 | void post_dispatch(Message *m, uint64_t msize); | |
122 | ||
123 | public: | |
124 | ||
125 | /// Throttle preventing us from building up a big backlog waiting for dispatch | |
126 | Throttle dispatch_throttler; | |
127 | ||
128 | bool stop; | |
129 | void local_delivery(Message *m, int priority); | |
130 | void run_local_delivery(); | |
131 | ||
132 | double get_max_age(utime_t now) const; | |
133 | ||
134 | int get_queue_len() const { | |
135 | Mutex::Locker l(lock); | |
136 | return mqueue.length(); | |
137 | } | |
138 | ||
139 | /** | |
140 | * Release memory accounting back to the dispatch throttler. | |
141 | * | |
142 | * @param msize The amount of memory to release. | |
143 | */ | |
144 | void dispatch_throttle_release(uint64_t msize); | |
145 | ||
146 | void queue_connect(Connection *con) { | |
147 | Mutex::Locker l(lock); | |
148 | if (stop) | |
149 | return; | |
150 | mqueue.enqueue_strict( | |
151 | 0, | |
152 | CEPH_MSG_PRIO_HIGHEST, | |
153 | QueueItem(D_CONNECT, con)); | |
154 | cond.Signal(); | |
155 | } | |
156 | void queue_accept(Connection *con) { | |
157 | Mutex::Locker l(lock); | |
158 | if (stop) | |
159 | return; | |
160 | mqueue.enqueue_strict( | |
161 | 0, | |
162 | CEPH_MSG_PRIO_HIGHEST, | |
163 | QueueItem(D_ACCEPT, con)); | |
164 | cond.Signal(); | |
165 | } | |
166 | void queue_remote_reset(Connection *con) { | |
167 | Mutex::Locker l(lock); | |
168 | if (stop) | |
169 | return; | |
170 | mqueue.enqueue_strict( | |
171 | 0, | |
172 | CEPH_MSG_PRIO_HIGHEST, | |
173 | QueueItem(D_BAD_REMOTE_RESET, con)); | |
174 | cond.Signal(); | |
175 | } | |
176 | void queue_reset(Connection *con) { | |
177 | Mutex::Locker l(lock); | |
178 | if (stop) | |
179 | return; | |
180 | mqueue.enqueue_strict( | |
181 | 0, | |
182 | CEPH_MSG_PRIO_HIGHEST, | |
183 | QueueItem(D_BAD_RESET, con)); | |
184 | cond.Signal(); | |
185 | } | |
186 | void queue_refused(Connection *con) { | |
187 | Mutex::Locker l(lock); | |
188 | if (stop) | |
189 | return; | |
190 | mqueue.enqueue_strict( | |
191 | 0, | |
192 | CEPH_MSG_PRIO_HIGHEST, | |
193 | QueueItem(D_CONN_REFUSED, con)); | |
194 | cond.Signal(); | |
195 | } | |
196 | ||
197 | bool can_fast_dispatch(const Message *m) const; | |
198 | void fast_dispatch(Message *m); | |
199 | void fast_preprocess(Message *m); | |
200 | void enqueue(Message *m, int priority, uint64_t id); | |
201 | void discard_queue(uint64_t id); | |
202 | void discard_local(); | |
203 | uint64_t get_id() { | |
204 | return next_id++; | |
205 | } | |
206 | void start(); | |
207 | void entry(); | |
208 | void wait(); | |
209 | void shutdown(); | |
210 | bool is_started() const {return dispatch_thread.is_started();} | |
211 | ||
212 | DispatchQueue(CephContext *cct, Messenger *msgr, string &name) | |
213 | : cct(cct), msgr(msgr), | |
214 | lock("Messenger::DispatchQueue::lock" + name), | |
215 | mqueue(cct->_conf->ms_pq_max_tokens_per_priority, | |
216 | cct->_conf->ms_pq_min_cost), | |
217 | next_id(1), | |
218 | dispatch_thread(this), | |
219 | local_delivery_lock("Messenger::DispatchQueue::local_delivery_lock" + name), | |
220 | stop_local_delivery(false), | |
221 | local_delivery_thread(this), | |
222 | dispatch_throttler(cct, string("msgr_dispatch_throttler-") + name, | |
223 | cct->_conf->ms_dispatch_throttle_bytes), | |
224 | stop(false) | |
225 | {} | |
226 | ~DispatchQueue() { | |
227 | assert(mqueue.empty()); | |
228 | assert(marrival.empty()); | |
229 | assert(local_messages.empty()); | |
230 | } | |
231 | }; | |
232 | ||
233 | #endif |