]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/DispatchQueue.h
update sources to v12.1.0
[ceph.git] / ceph / src / msg / DispatchQueue.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef CEPH_DISPATCHQUEUE_H
16#define CEPH_DISPATCHQUEUE_H
17
18#include <atomic>
19#include <map>
20#include <boost/intrusive_ptr.hpp>
21#include "include/assert.h"
22#include "include/xlist.h"
7c673cae
FG
23#include "common/Mutex.h"
24#include "common/Cond.h"
25#include "common/Thread.h"
26#include "common/PrioritizedQueue.h"
27
28class CephContext;
29class Messenger;
30class Message;
31struct Connection;
32
33/**
34 * The DispatchQueue contains all the connections which have Messages
35 * they want to be dispatched, carefully organized by Message priority
36 * and permitted to deliver in a round-robin fashion.
37 * See Messenger::dispatch_entry for details.
38 */
39class DispatchQueue {
40 class QueueItem {
41 int type;
42 ConnectionRef con;
43 MessageRef m;
44 public:
45 explicit QueueItem(Message *m) : type(-1), con(0), m(m) {}
46 QueueItem(int type, Connection *con) : type(type), con(con), m(0) {}
47 bool is_code() const {
48 return type != -1;
49 }
50 int get_code () const {
51 assert(is_code());
52 return type;
53 }
54 Message *get_message() {
55 assert(!is_code());
56 return m.get();
57 }
58 Connection *get_connection() {
59 assert(is_code());
60 return con.get();
61 }
62 };
63
64 CephContext *cct;
65 Messenger *msgr;
66 mutable Mutex lock;
67 Cond cond;
68
69 PrioritizedQueue<QueueItem, uint64_t> mqueue;
70
71 set<pair<double, Message*> > marrival;
72 map<Message *, set<pair<double, Message*> >::iterator> marrival_map;
73 void add_arrival(Message *m) {
74 marrival_map.insert(
75 make_pair(
76 m,
77 marrival.insert(make_pair(m->get_recv_stamp(), m)).first
78 )
79 );
80 }
81 void remove_arrival(Message *m) {
82 map<Message *, set<pair<double, Message*> >::iterator>::iterator i =
83 marrival_map.find(m);
84 assert(i != marrival_map.end());
85 marrival.erase(i->second);
86 marrival_map.erase(i);
87 }
88
89 std::atomic<uint64_t> next_id;
90
91 enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_CONN_REFUSED, D_NUM_CODES };
92
93 /**
94 * The DispatchThread runs dispatch_entry to empty out the dispatch_queue.
95 */
96 class DispatchThread : public Thread {
97 DispatchQueue *dq;
98 public:
99 explicit DispatchThread(DispatchQueue *dq) : dq(dq) {}
100 void *entry() override {
101 dq->entry();
102 return 0;
103 }
104 } dispatch_thread;
105
106 Mutex local_delivery_lock;
107 Cond local_delivery_cond;
108 bool stop_local_delivery;
109 list<pair<Message *, int> > local_messages;
110 class LocalDeliveryThread : public Thread {
111 DispatchQueue *dq;
112 public:
113 explicit LocalDeliveryThread(DispatchQueue *dq) : dq(dq) {}
114 void *entry() override {
115 dq->run_local_delivery();
116 return 0;
117 }
118 } local_delivery_thread;
119
120 uint64_t pre_dispatch(Message *m);
121 void post_dispatch(Message *m, uint64_t msize);
122
123 public:
124
125 /// Throttle preventing us from building up a big backlog waiting for dispatch
126 Throttle dispatch_throttler;
127
128 bool stop;
129 void local_delivery(Message *m, int priority);
130 void run_local_delivery();
131
132 double get_max_age(utime_t now) const;
133
134 int get_queue_len() const {
135 Mutex::Locker l(lock);
136 return mqueue.length();
137 }
138
139 /**
140 * Release memory accounting back to the dispatch throttler.
141 *
142 * @param msize The amount of memory to release.
143 */
144 void dispatch_throttle_release(uint64_t msize);
145
146 void queue_connect(Connection *con) {
147 Mutex::Locker l(lock);
148 if (stop)
149 return;
150 mqueue.enqueue_strict(
151 0,
152 CEPH_MSG_PRIO_HIGHEST,
153 QueueItem(D_CONNECT, con));
154 cond.Signal();
155 }
156 void queue_accept(Connection *con) {
157 Mutex::Locker l(lock);
158 if (stop)
159 return;
160 mqueue.enqueue_strict(
161 0,
162 CEPH_MSG_PRIO_HIGHEST,
163 QueueItem(D_ACCEPT, con));
164 cond.Signal();
165 }
166 void queue_remote_reset(Connection *con) {
167 Mutex::Locker l(lock);
168 if (stop)
169 return;
170 mqueue.enqueue_strict(
171 0,
172 CEPH_MSG_PRIO_HIGHEST,
173 QueueItem(D_BAD_REMOTE_RESET, con));
174 cond.Signal();
175 }
176 void queue_reset(Connection *con) {
177 Mutex::Locker l(lock);
178 if (stop)
179 return;
180 mqueue.enqueue_strict(
181 0,
182 CEPH_MSG_PRIO_HIGHEST,
183 QueueItem(D_BAD_RESET, con));
184 cond.Signal();
185 }
186 void queue_refused(Connection *con) {
187 Mutex::Locker l(lock);
188 if (stop)
189 return;
190 mqueue.enqueue_strict(
191 0,
192 CEPH_MSG_PRIO_HIGHEST,
193 QueueItem(D_CONN_REFUSED, con));
194 cond.Signal();
195 }
196
197 bool can_fast_dispatch(const Message *m) const;
198 void fast_dispatch(Message *m);
199 void fast_preprocess(Message *m);
200 void enqueue(Message *m, int priority, uint64_t id);
201 void discard_queue(uint64_t id);
202 void discard_local();
203 uint64_t get_id() {
204 return next_id++;
205 }
206 void start();
207 void entry();
208 void wait();
209 void shutdown();
210 bool is_started() const {return dispatch_thread.is_started();}
211
212 DispatchQueue(CephContext *cct, Messenger *msgr, string &name)
213 : cct(cct), msgr(msgr),
214 lock("Messenger::DispatchQueue::lock" + name),
215 mqueue(cct->_conf->ms_pq_max_tokens_per_priority,
216 cct->_conf->ms_pq_min_cost),
217 next_id(1),
218 dispatch_thread(this),
219 local_delivery_lock("Messenger::DispatchQueue::local_delivery_lock" + name),
220 stop_local_delivery(false),
221 local_delivery_thread(this),
222 dispatch_throttler(cct, string("msgr_dispatch_throttler-") + name,
223 cct->_conf->ms_dispatch_throttle_bytes),
224 stop(false)
225 {}
226 ~DispatchQueue() {
227 assert(mqueue.empty());
228 assert(marrival.empty());
229 assert(local_messages.empty());
230 }
231};
232
233#endif