]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/Event.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / msg / async / Event.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifndef CEPH_MSG_EVENT_H
18 #define CEPH_MSG_EVENT_H
19
20 #ifdef __APPLE__
21 #include <AvailabilityMacros.h>
22 #endif
23
24 // We use epoll, kqueue, evport, select in descending order by performance.
25 #if defined(__linux__)
26 #define HAVE_EPOLL 1
27 #endif
28
29 #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
30 #define HAVE_KQUEUE 1
31 #endif
32
33 #ifdef _WIN32
34 #define HAVE_POLL 1
35 #endif
36
37 #ifdef __sun
38 #include <sys/feature_tests.h>
39 #ifdef _DTRACE_VERSION
40 #define HAVE_EVPORT 1
41 #endif
42 #endif
43
44 #include <atomic>
45 #include <mutex>
46 #include <condition_variable>
47
48 #include "common/ceph_time.h"
49 #include "common/dout.h"
50 #include "net_handler.h"
51
52 #define EVENT_NONE 0
53 #define EVENT_READABLE 1
54 #define EVENT_WRITABLE 2
55
56 class EventCenter;
57
58 class EventCallback {
59
60 public:
61 virtual void do_request(uint64_t fd_or_id) = 0;
62 virtual ~EventCallback() {} // we want a virtual destructor!!!
63 };
64
65 typedef EventCallback* EventCallbackRef;
66
67 struct FiredFileEvent {
68 int fd;
69 int mask;
70 };
71
72 /*
73 * EventDriver is a wrap of event mechanisms depends on different OS.
74 * For example, Linux will use epoll(2), BSD will use kqueue(2) and select will
75 * be used for worst condition.
76 */
77 class EventDriver {
78 public:
79 virtual ~EventDriver() {} // we want a virtual destructor!!!
80 virtual int init(EventCenter *center, int nevent) = 0;
81 virtual int add_event(int fd, int cur_mask, int mask) = 0;
82 virtual int del_event(int fd, int cur_mask, int del_mask) = 0;
83 virtual int event_wait(std::vector<FiredFileEvent> &fired_events, struct timeval *tp) = 0;
84 virtual int resize_events(int newsize) = 0;
85 virtual bool need_wakeup() { return true; }
86 };
87
88 /*
89 * EventCenter maintain a set of file descriptor and handle registered events.
90 */
91 class EventCenter {
92 public:
93 // should be enough;
94 static const int MAX_EVENTCENTER = 24;
95
96 private:
97 using clock_type = ceph::coarse_mono_clock;
98
99 struct AssociatedCenters {
100 EventCenter *centers[MAX_EVENTCENTER];
101 AssociatedCenters() {
102 // FIPS zeroization audit 20191115: this memset is not security related.
103 memset(centers, 0, MAX_EVENTCENTER * sizeof(EventCenter*));
104 }
105 };
106
107 struct FileEvent {
108 int mask;
109 EventCallbackRef read_cb;
110 EventCallbackRef write_cb;
111 FileEvent(): mask(0), read_cb(NULL), write_cb(NULL) {}
112 };
113
114 struct TimeEvent {
115 uint64_t id;
116 EventCallbackRef time_cb;
117
118 TimeEvent(): id(0), time_cb(NULL) {}
119 };
120
121 public:
122 /**
123 * A Poller object is invoked once each time through the dispatcher's
124 * inner polling loop.
125 */
126 class Poller {
127 public:
128 explicit Poller(EventCenter* center, const std::string& pollerName);
129 virtual ~Poller();
130
131 /**
132 * This method is defined by a subclass and invoked once by the
133 * center during each pass through its inner polling loop.
134 *
135 * \return
136 * 1 means that this poller did useful work during this call.
137 * 0 means that the poller found no work to do.
138 */
139 virtual int poll() = 0;
140
141 private:
142 /// The EventCenter object that owns this Poller. NULL means the
143 /// EventCenter has been deleted.
144 EventCenter* owner;
145
146 /// Human-readable string name given to the poller to make it
147 /// easy to identify for debugging. For most pollers just passing
148 /// in the subclass name probably makes sense.
149 std::string poller_name;
150
151 /// Index of this Poller in EventCenter::pollers. Allows deletion
152 /// without having to scan all the entries in pollers. -1 means
153 /// this poller isn't currently in EventCenter::pollers (happens
154 /// after EventCenter::reset).
155 int slot;
156 };
157
158 private:
159 CephContext *cct;
160 std::string type;
161 int nevent;
162 // Used only to external event
163 pthread_t owner = 0;
164 std::mutex external_lock;
165 std::atomic_ulong external_num_events;
166 std::deque<EventCallbackRef> external_events;
167 std::vector<FileEvent> file_events;
168 EventDriver *driver;
169 std::multimap<clock_type::time_point, TimeEvent> time_events;
170 // Keeps track of all of the pollers currently defined. We don't
171 // use an intrusive list here because it isn't reentrant: we need
172 // to add/remove elements while the center is traversing the list.
173 std::vector<Poller*> pollers;
174 std::map<uint64_t, std::multimap<clock_type::time_point, TimeEvent>::iterator> event_map;
175 uint64_t time_event_next_id;
176 int notify_receive_fd;
177 int notify_send_fd;
178 ceph::NetHandler net;
179 EventCallbackRef notify_handler;
180 unsigned center_id;
181 AssociatedCenters *global_centers = nullptr;
182
183 int process_time_events();
184 FileEvent *_get_file_event(int fd) {
185 ceph_assert(fd < nevent);
186 return &file_events[fd];
187 }
188
189 public:
190 explicit EventCenter(CephContext *c):
191 cct(c), nevent(0),
192 external_num_events(0),
193 driver(NULL), time_event_next_id(1),
194 notify_receive_fd(-1), notify_send_fd(-1), net(c),
195 notify_handler(NULL), center_id(0) { }
196 ~EventCenter();
197 std::ostream& _event_prefix(std::ostream *_dout);
198
199 int init(int nevent, unsigned center_id, const std::string &type);
200 void set_owner();
201 pthread_t get_owner() const { return owner; }
202 unsigned get_id() const { return center_id; }
203
204 EventDriver *get_driver() { return driver; }
205
206 // Used by internal thread
207 int create_file_event(int fd, int mask, EventCallbackRef ctxt);
208 uint64_t create_time_event(uint64_t milliseconds, EventCallbackRef ctxt);
209 void delete_file_event(int fd, int mask);
210 void delete_time_event(uint64_t id);
211 int process_events(unsigned timeout_microseconds, ceph::timespan *working_dur = nullptr);
212 void wakeup();
213
214 // Used by external thread
215 void dispatch_event_external(EventCallbackRef e);
216 inline bool in_thread() const {
217 return pthread_equal(pthread_self(), owner);
218 }
219
220 private:
221 template <typename func>
222 class C_submit_event : public EventCallback {
223 std::mutex lock;
224 std::condition_variable cond;
225 bool done = false;
226 func f;
227 bool nonwait;
228 public:
229 C_submit_event(func &&_f, bool nowait)
230 : f(std::move(_f)), nonwait(nowait) {}
231 void do_request(uint64_t id) override {
232 f();
233 lock.lock();
234 cond.notify_all();
235 done = true;
236 bool del = nonwait;
237 lock.unlock();
238 if (del)
239 delete this;
240 }
241 void wait() {
242 ceph_assert(!nonwait);
243 std::unique_lock<std::mutex> l(lock);
244 while (!done)
245 cond.wait(l);
246 }
247 };
248
249 public:
250 template <typename func>
251 void submit_to(int i, func &&f, bool always_async = false) {
252 ceph_assert(i < MAX_EVENTCENTER && global_centers);
253 EventCenter *c = global_centers->centers[i];
254 ceph_assert(c);
255 if (always_async) {
256 C_submit_event<func> *event = new C_submit_event<func>(std::move(f), true);
257 c->dispatch_event_external(event);
258 } else if (c->in_thread()) {
259 f();
260 return;
261 } else {
262 C_submit_event<func> event(std::move(f), false);
263 c->dispatch_event_external(&event);
264 event.wait();
265 }
266 };
267 };
268
269 #endif