]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/Event.h
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / msg / async / Event.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifndef CEPH_MSG_EVENT_H
18 #define CEPH_MSG_EVENT_H
19
20 #ifdef __APPLE__
21 #include <AvailabilityMacros.h>
22 #endif
23
24 // We use epoll, kqueue, evport, select in descending order by performance.
25 #if defined(__linux__)
26 #define HAVE_EPOLL 1
27 #endif
28
29 #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
30 #define HAVE_KQUEUE 1
31 #endif
32
33 #ifdef __sun
34 #include <sys/feature_tests.h>
35 #ifdef _DTRACE_VERSION
36 #define HAVE_EVPORT 1
37 #endif
38 #endif
39
40 #include <atomic>
41 #include <mutex>
42 #include <condition_variable>
43
44 #include "common/ceph_time.h"
45 #include "common/dout.h"
46 #include "net_handler.h"
47
48 #define EVENT_NONE 0
49 #define EVENT_READABLE 1
50 #define EVENT_WRITABLE 2
51
52 class EventCenter;
53
54 class EventCallback {
55
56 public:
57 virtual void do_request(uint64_t fd_or_id) = 0;
58 virtual ~EventCallback() {} // we want a virtual destructor!!!
59 };
60
61 typedef EventCallback* EventCallbackRef;
62
63 struct FiredFileEvent {
64 int fd;
65 int mask;
66 };
67
68 /*
69 * EventDriver is a wrap of event mechanisms depends on different OS.
70 * For example, Linux will use epoll(2), BSD will use kqueue(2) and select will
71 * be used for worst condition.
72 */
73 class EventDriver {
74 public:
75 virtual ~EventDriver() {} // we want a virtual destructor!!!
76 virtual int init(EventCenter *center, int nevent) = 0;
77 virtual int add_event(int fd, int cur_mask, int mask) = 0;
78 virtual int del_event(int fd, int cur_mask, int del_mask) = 0;
79 virtual int event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tp) = 0;
80 virtual int resize_events(int newsize) = 0;
81 virtual bool need_wakeup() { return true; }
82 };
83
84 /*
85 * EventCenter maintain a set of file descriptor and handle registered events.
86 */
87 class EventCenter {
88 public:
89 // should be enough;
90 static const int MAX_EVENTCENTER = 24;
91
92 private:
93 using clock_type = ceph::coarse_mono_clock;
94
95 struct AssociatedCenters {
96 EventCenter *centers[MAX_EVENTCENTER];
97 AssociatedCenters() {
98 // FIPS zeroization audit 20191115: this memset is not security related.
99 memset(centers, 0, MAX_EVENTCENTER * sizeof(EventCenter*));
100 }
101 };
102
103 struct FileEvent {
104 int mask;
105 EventCallbackRef read_cb;
106 EventCallbackRef write_cb;
107 FileEvent(): mask(0), read_cb(NULL), write_cb(NULL) {}
108 };
109
110 struct TimeEvent {
111 uint64_t id;
112 EventCallbackRef time_cb;
113
114 TimeEvent(): id(0), time_cb(NULL) {}
115 };
116
117 public:
118 /**
119 * A Poller object is invoked once each time through the dispatcher's
120 * inner polling loop.
121 */
122 class Poller {
123 public:
124 explicit Poller(EventCenter* center, const string& pollerName);
125 virtual ~Poller();
126
127 /**
128 * This method is defined by a subclass and invoked once by the
129 * center during each pass through its inner polling loop.
130 *
131 * \return
132 * 1 means that this poller did useful work during this call.
133 * 0 means that the poller found no work to do.
134 */
135 virtual int poll() = 0;
136
137 private:
138 /// The EventCenter object that owns this Poller. NULL means the
139 /// EventCenter has been deleted.
140 EventCenter* owner;
141
142 /// Human-readable string name given to the poller to make it
143 /// easy to identify for debugging. For most pollers just passing
144 /// in the subclass name probably makes sense.
145 string poller_name;
146
147 /// Index of this Poller in EventCenter::pollers. Allows deletion
148 /// without having to scan all the entries in pollers. -1 means
149 /// this poller isn't currently in EventCenter::pollers (happens
150 /// after EventCenter::reset).
151 int slot;
152 };
153
154 private:
155 CephContext *cct;
156 std::string type;
157 int nevent;
158 // Used only to external event
159 pthread_t owner = 0;
160 std::mutex external_lock;
161 std::atomic_ulong external_num_events;
162 deque<EventCallbackRef> external_events;
163 vector<FileEvent> file_events;
164 EventDriver *driver;
165 std::multimap<clock_type::time_point, TimeEvent> time_events;
166 // Keeps track of all of the pollers currently defined. We don't
167 // use an intrusive list here because it isn't reentrant: we need
168 // to add/remove elements while the center is traversing the list.
169 std::vector<Poller*> pollers;
170 std::map<uint64_t, std::multimap<clock_type::time_point, TimeEvent>::iterator> event_map;
171 uint64_t time_event_next_id;
172 int notify_receive_fd;
173 int notify_send_fd;
174 NetHandler net;
175 EventCallbackRef notify_handler;
176 unsigned idx;
177 AssociatedCenters *global_centers = nullptr;
178
179 int process_time_events();
180 FileEvent *_get_file_event(int fd) {
181 ceph_assert(fd < nevent);
182 return &file_events[fd];
183 }
184
185 public:
186 explicit EventCenter(CephContext *c):
187 cct(c), nevent(0),
188 external_num_events(0),
189 driver(NULL), time_event_next_id(1),
190 notify_receive_fd(-1), notify_send_fd(-1), net(c),
191 notify_handler(NULL), idx(0) { }
192 ~EventCenter();
193 ostream& _event_prefix(std::ostream *_dout);
194
195 int init(int nevent, unsigned idx, const std::string &t);
196 void set_owner();
197 pthread_t get_owner() const { return owner; }
198 unsigned get_id() const { return idx; }
199
200 EventDriver *get_driver() { return driver; }
201
202 // Used by internal thread
203 int create_file_event(int fd, int mask, EventCallbackRef ctxt);
204 uint64_t create_time_event(uint64_t milliseconds, EventCallbackRef ctxt);
205 void delete_file_event(int fd, int mask);
206 void delete_time_event(uint64_t id);
207 int process_events(unsigned timeout_microseconds, ceph::timespan *working_dur = nullptr);
208 void wakeup();
209
210 // Used by external thread
211 void dispatch_event_external(EventCallbackRef e);
212 inline bool in_thread() const {
213 return pthread_equal(pthread_self(), owner);
214 }
215
216 private:
217 template <typename func>
218 class C_submit_event : public EventCallback {
219 std::mutex lock;
220 std::condition_variable cond;
221 bool done = false;
222 func f;
223 bool nonwait;
224 public:
225 C_submit_event(func &&_f, bool nw)
226 : f(std::move(_f)), nonwait(nw) {}
227 void do_request(uint64_t id) override {
228 f();
229 lock.lock();
230 cond.notify_all();
231 done = true;
232 bool del = nonwait;
233 lock.unlock();
234 if (del)
235 delete this;
236 }
237 void wait() {
238 ceph_assert(!nonwait);
239 std::unique_lock<std::mutex> l(lock);
240 while (!done)
241 cond.wait(l);
242 }
243 };
244
245 public:
246 template <typename func>
247 void submit_to(int i, func &&f, bool nowait = false) {
248 ceph_assert(i < MAX_EVENTCENTER && global_centers);
249 EventCenter *c = global_centers->centers[i];
250 ceph_assert(c);
251 if (!nowait && c->in_thread()) {
252 f();
253 return ;
254 }
255 if (nowait) {
256 C_submit_event<func> *event = new C_submit_event<func>(std::move(f), true);
257 c->dispatch_event_external(event);
258 } else {
259 C_submit_event<func> event(std::move(f), false);
260 c->dispatch_event_external(&event);
261 event.wait();
262 }
263 };
264 };
265
266 #endif