]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/Event.h
update sources to v12.1.0
[ceph.git] / ceph / src / msg / async / Event.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifndef CEPH_MSG_EVENT_H
18 #define CEPH_MSG_EVENT_H
19
20 #ifdef __APPLE__
21 #include <AvailabilityMacros.h>
22 #endif
23
24 // We use epoll, kqueue, evport, select in descending order by performance.
25 #if defined(__linux__)
26 #define HAVE_EPOLL 1
27 #endif
28
29 #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
30 #define HAVE_KQUEUE 1
31 #endif
32
33 #ifdef __sun
34 #include <sys/feature_tests.h>
35 #ifdef _DTRACE_VERSION
36 #define HAVE_EVPORT 1
37 #endif
38 #endif
39
40 #include <atomic>
41 #include <mutex>
42 #include <condition_variable>
43
44 #include "common/ceph_time.h"
45 #include "common/dout.h"
46 #include "net_handler.h"
47
48 #define EVENT_NONE 0
49 #define EVENT_READABLE 1
50 #define EVENT_WRITABLE 2
51
52 class EventCenter;
53
54 class EventCallback {
55
56 public:
57 virtual void do_request(int fd_or_id) = 0;
58 virtual ~EventCallback() {} // we want a virtual destructor!!!
59 };
60
61 typedef EventCallback* EventCallbackRef;
62
63 struct FiredFileEvent {
64 int fd;
65 int mask;
66 };
67
68 /*
69 * EventDriver is a wrap of event mechanisms depends on different OS.
70 * For example, Linux will use epoll(2), BSD will use kqueue(2) and select will
71 * be used for worst condition.
72 */
73 class EventDriver {
74 public:
75 virtual ~EventDriver() {} // we want a virtual destructor!!!
76 virtual int init(EventCenter *center, int nevent) = 0;
77 virtual int add_event(int fd, int cur_mask, int mask) = 0;
78 virtual int del_event(int fd, int cur_mask, int del_mask) = 0;
79 virtual int event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tp) = 0;
80 virtual int resize_events(int newsize) = 0;
81 virtual bool need_wakeup() { return true; }
82 };
83
84 /*
85 * EventCenter maintain a set of file descriptor and handle registered events.
86 */
87 class EventCenter {
88 public:
89 // should be enough;
90 static const int MAX_EVENTCENTER = 24;
91
92 private:
93 using clock_type = ceph::coarse_mono_clock;
94
95 struct AssociatedCenters {
96 EventCenter *centers[MAX_EVENTCENTER];
97 AssociatedCenters(CephContext *c) {
98 memset(centers, 0, MAX_EVENTCENTER * sizeof(EventCenter*));
99 }
100 };
101
102 struct FileEvent {
103 int mask;
104 EventCallbackRef read_cb;
105 EventCallbackRef write_cb;
106 FileEvent(): mask(0), read_cb(NULL), write_cb(NULL) {}
107 };
108
109 struct TimeEvent {
110 uint64_t id;
111 EventCallbackRef time_cb;
112
113 TimeEvent(): id(0), time_cb(NULL) {}
114 };
115
116 public:
117 /**
118 * A Poller object is invoked once each time through the dispatcher's
119 * inner polling loop.
120 */
121 class Poller {
122 public:
123 explicit Poller(EventCenter* center, const string& pollerName);
124 virtual ~Poller();
125
126 /**
127 * This method is defined by a subclass and invoked once by the
128 * center during each pass through its inner polling loop.
129 *
130 * \return
131 * 1 means that this poller did useful work during this call.
132 * 0 means that the poller found no work to do.
133 */
134 virtual int poll() = 0;
135
136 private:
137 /// The EventCenter object that owns this Poller. NULL means the
138 /// EventCenter has been deleted.
139 EventCenter* owner;
140
141 /// Human-readable string name given to the poller to make it
142 /// easy to identify for debugging. For most pollers just passing
143 /// in the subclass name probably makes sense.
144 string poller_name;
145
146 /// Index of this Poller in EventCenter::pollers. Allows deletion
147 /// without having to scan all the entries in pollers. -1 means
148 /// this poller isn't currently in EventCenter::pollers (happens
149 /// after EventCenter::reset).
150 int slot;
151 };
152
153 private:
154 CephContext *cct;
155 std::string type;
156 int nevent;
157 // Used only to external event
158 pthread_t owner;
159 std::mutex external_lock;
160 std::atomic_ulong external_num_events;
161 deque<EventCallbackRef> external_events;
162 vector<FileEvent> file_events;
163 EventDriver *driver;
164 std::multimap<clock_type::time_point, TimeEvent> time_events;
165 // Keeps track of all of the pollers currently defined. We don't
166 // use an intrusive list here because it isn't reentrant: we need
167 // to add/remove elements while the center is traversing the list.
168 std::vector<Poller*> pollers;
169 std::map<uint64_t, std::multimap<clock_type::time_point, TimeEvent>::iterator> event_map;
170 uint64_t time_event_next_id;
171 int notify_receive_fd;
172 int notify_send_fd;
173 NetHandler net;
174 EventCallbackRef notify_handler;
175 unsigned idx;
176 AssociatedCenters *global_centers = nullptr;
177
178 int process_time_events();
179 FileEvent *_get_file_event(int fd) {
180 assert(fd < nevent);
181 return &file_events[fd];
182 }
183
184 public:
185 explicit EventCenter(CephContext *c):
186 cct(c), nevent(0),
187 external_num_events(0),
188 driver(NULL), time_event_next_id(1),
189 notify_receive_fd(-1), notify_send_fd(-1), net(c),
190 notify_handler(NULL), idx(0) { }
191 ~EventCenter();
192 ostream& _event_prefix(std::ostream *_dout);
193
194 int init(int nevent, unsigned idx, const std::string &t);
195 void set_owner();
196 pthread_t get_owner() const { return owner; }
197 unsigned get_id() const { return idx; }
198
199 EventDriver *get_driver() { return driver; }
200
201 // Used by internal thread
202 int create_file_event(int fd, int mask, EventCallbackRef ctxt);
203 uint64_t create_time_event(uint64_t milliseconds, EventCallbackRef ctxt);
204 void delete_file_event(int fd, int mask);
205 void delete_time_event(uint64_t id);
206 int process_events(int timeout_microseconds, ceph::timespan *working_dur = nullptr);
207 void wakeup();
208
209 // Used by external thread
210 void dispatch_event_external(EventCallbackRef e);
211 inline bool in_thread() const {
212 return pthread_equal(pthread_self(), owner);
213 }
214
215 private:
216 template <typename func>
217 class C_submit_event : public EventCallback {
218 std::mutex lock;
219 std::condition_variable cond;
220 bool done = false;
221 func f;
222 bool nonwait;
223 public:
224 C_submit_event(func &&_f, bool nw)
225 : f(std::move(_f)), nonwait(nw) {}
226 void do_request(int id) override {
227 f();
228 lock.lock();
229 cond.notify_all();
230 done = true;
231 bool del = nonwait;
232 lock.unlock();
233 if (del)
234 delete this;
235 }
236 void wait() {
237 assert(!nonwait);
238 std::unique_lock<std::mutex> l(lock);
239 while (!done)
240 cond.wait(l);
241 }
242 };
243
244 public:
245 template <typename func>
246 void submit_to(int i, func &&f, bool nowait = false) {
247 assert(i < MAX_EVENTCENTER && global_centers);
248 EventCenter *c = global_centers->centers[i];
249 assert(c);
250 if (!nowait && c->in_thread()) {
251 f();
252 return ;
253 }
254 if (nowait) {
255 C_submit_event<func> *event = new C_submit_event<func>(std::move(f), true);
256 c->dispatch_event_external(event);
257 } else {
258 C_submit_event<func> event(std::move(f), false);
259 c->dispatch_event_external(&event);
260 event.wait();
261 }
262 };
263 };
264
265 #endif