]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #include "common/errno.h" | |
18 | #include "Event.h" | |
19 | ||
20 | #ifdef HAVE_DPDK | |
21 | #include "dpdk/EventDPDK.h" | |
22 | #endif | |
23 | ||
24 | #ifdef HAVE_EPOLL | |
25 | #include "EventEpoll.h" | |
26 | #else | |
27 | #ifdef HAVE_KQUEUE | |
28 | #include "EventKqueue.h" | |
29 | #else | |
30 | #include "EventSelect.h" | |
31 | #endif | |
32 | #endif | |
33 | ||
34 | #define dout_subsys ceph_subsys_ms | |
35 | ||
36 | #undef dout_prefix | |
37 | #define dout_prefix *_dout << "EventCallback " | |
38 | class C_handle_notify : public EventCallback { | |
39 | EventCenter *center; | |
40 | CephContext *cct; | |
41 | ||
42 | public: | |
43 | C_handle_notify(EventCenter *c, CephContext *cc): center(c), cct(cc) {} | |
44 | void do_request(int fd_or_id) override { | |
45 | char c[256]; | |
46 | int r = 0; | |
47 | do { | |
48 | r = read(fd_or_id, c, sizeof(c)); | |
49 | if (r < 0) { | |
50 | if (errno != EAGAIN) | |
51 | ldout(cct, 1) << __func__ << " read notify pipe failed: " << cpp_strerror(errno) << dendl; | |
52 | } | |
53 | } while (r > 0); | |
54 | } | |
55 | }; | |
56 | ||
57 | #undef dout_prefix | |
58 | #define dout_prefix _event_prefix(_dout) | |
59 | ||
60 | /** | |
61 | * Construct a Poller. | |
62 | * | |
63 | * \param center | |
64 | * EventCenter object through which the poller will be invoked (defaults | |
65 | * to the global #RAMCloud::center object). | |
66 | * \param pollerName | |
67 | * Human readable name that can be printed out in debugging messages | |
68 | * about the poller. The name of the superclass is probably sufficient | |
69 | * for most cases. | |
70 | */ | |
71 | EventCenter::Poller::Poller(EventCenter* center, const string& name) | |
72 | : owner(center), poller_name(name), slot(owner->pollers.size()) | |
73 | { | |
74 | owner->pollers.push_back(this); | |
75 | } | |
76 | ||
77 | /** | |
78 | * Destroy a Poller. | |
79 | */ | |
80 | EventCenter::Poller::~Poller() | |
81 | { | |
82 | // Erase this Poller from the vector by overwriting it with the | |
83 | // poller that used to be the last one in the vector. | |
84 | // | |
85 | // Note: this approach is reentrant (it is safe to delete a | |
86 | // poller from a poller callback, which means that the poll | |
87 | // method is in the middle of scanning the list of all pollers; | |
88 | // the worst that will happen is that the poller that got moved | |
89 | // may not be invoked in the current scan). | |
90 | owner->pollers[slot] = owner->pollers.back(); | |
91 | owner->pollers[slot]->slot = slot; | |
92 | owner->pollers.pop_back(); | |
93 | slot = -1; | |
94 | } | |
95 | ||
96 | ostream& EventCenter::_event_prefix(std::ostream *_dout) | |
97 | { | |
98 | return *_dout << "Event(" << this << " nevent=" << nevent | |
99 | << " time_id=" << time_event_next_id << ")."; | |
100 | } | |
101 | ||
102 | int EventCenter::init(int n, unsigned i, const std::string &t) | |
103 | { | |
104 | // can't init multi times | |
105 | assert(nevent == 0); | |
106 | ||
107 | type = t; | |
108 | idx = i; | |
109 | ||
110 | if (t == "dpdk") { | |
111 | #ifdef HAVE_DPDK | |
112 | driver = new DPDKDriver(cct); | |
113 | #endif | |
114 | } else { | |
115 | #ifdef HAVE_EPOLL | |
116 | driver = new EpollDriver(cct); | |
117 | #else | |
118 | #ifdef HAVE_KQUEUE | |
119 | driver = new KqueueDriver(cct); | |
120 | #else | |
121 | driver = new SelectDriver(cct); | |
122 | #endif | |
123 | #endif | |
124 | } | |
125 | ||
126 | if (!driver) { | |
127 | lderr(cct) << __func__ << " failed to create event driver " << dendl; | |
128 | return -1; | |
129 | } | |
130 | ||
131 | int r = driver->init(this, n); | |
132 | if (r < 0) { | |
133 | lderr(cct) << __func__ << " failed to init event driver." << dendl; | |
134 | return r; | |
135 | } | |
136 | ||
137 | file_events.resize(n); | |
138 | nevent = n; | |
139 | ||
140 | if (!driver->need_wakeup()) | |
141 | return 0; | |
142 | ||
143 | int fds[2]; | |
144 | if (pipe(fds) < 0) { | |
145 | lderr(cct) << __func__ << " can't create notify pipe" << dendl; | |
146 | return -errno; | |
147 | } | |
148 | ||
149 | notify_receive_fd = fds[0]; | |
150 | notify_send_fd = fds[1]; | |
151 | r = net.set_nonblock(notify_receive_fd); | |
152 | if (r < 0) { | |
153 | return r; | |
154 | } | |
155 | r = net.set_nonblock(notify_send_fd); | |
156 | if (r < 0) { | |
157 | return r; | |
158 | } | |
159 | ||
160 | return r; | |
161 | } | |
162 | ||
163 | EventCenter::~EventCenter() | |
164 | { | |
165 | { | |
166 | std::lock_guard<std::mutex> l(external_lock); | |
167 | while (!external_events.empty()) { | |
168 | EventCallbackRef e = external_events.front(); | |
169 | if (e) | |
170 | e->do_request(0); | |
171 | external_events.pop_front(); | |
172 | } | |
173 | } | |
174 | assert(time_events.empty()); | |
175 | ||
176 | if (notify_receive_fd >= 0) | |
177 | ::close(notify_receive_fd); | |
178 | if (notify_send_fd >= 0) | |
179 | ::close(notify_send_fd); | |
180 | ||
181 | delete driver; | |
182 | if (notify_handler) | |
183 | delete notify_handler; | |
184 | } | |
185 | ||
186 | ||
187 | void EventCenter::set_owner() | |
188 | { | |
189 | owner = pthread_self(); | |
190 | ldout(cct, 2) << __func__ << " idx=" << idx << " owner=" << owner << dendl; | |
191 | if (!global_centers) { | |
192 | cct->lookup_or_create_singleton_object<EventCenter::AssociatedCenters>( | |
193 | global_centers, "AsyncMessenger::EventCenter::global_center::"+type); | |
194 | assert(global_centers); | |
195 | global_centers->centers[idx] = this; | |
196 | if (driver->need_wakeup()) { | |
197 | notify_handler = new C_handle_notify(this, cct); | |
198 | int r = create_file_event(notify_receive_fd, EVENT_READABLE, notify_handler); | |
199 | assert(r == 0); | |
200 | } | |
201 | } | |
202 | } | |
203 | ||
204 | int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) | |
205 | { | |
206 | assert(in_thread()); | |
207 | int r = 0; | |
208 | if (fd >= nevent) { | |
209 | int new_size = nevent << 2; | |
210 | while (fd > new_size) | |
211 | new_size <<= 2; | |
212 | ldout(cct, 20) << __func__ << " event count exceed " << nevent << ", expand to " << new_size << dendl; | |
213 | r = driver->resize_events(new_size); | |
214 | if (r < 0) { | |
215 | lderr(cct) << __func__ << " event count is exceed." << dendl; | |
216 | return -ERANGE; | |
217 | } | |
218 | file_events.resize(new_size); | |
219 | nevent = new_size; | |
220 | } | |
221 | ||
222 | EventCenter::FileEvent *event = _get_file_event(fd); | |
223 | ldout(cct, 20) << __func__ << " create event started fd=" << fd << " mask=" << mask | |
224 | << " original mask is " << event->mask << dendl; | |
225 | if (event->mask == mask) | |
226 | return 0; | |
227 | ||
228 | r = driver->add_event(fd, event->mask, mask); | |
229 | if (r < 0) { | |
230 | // Actually we don't allow any failed error code, caller doesn't prepare to | |
231 | // handle error status. So now we need to assert failure here. In practice, | |
232 | // add_event shouldn't report error, otherwise it must be a innermost bug! | |
233 | assert(0 == "BUG!"); | |
234 | return r; | |
235 | } | |
236 | ||
237 | event->mask |= mask; | |
238 | if (mask & EVENT_READABLE) { | |
239 | event->read_cb = ctxt; | |
240 | } | |
241 | if (mask & EVENT_WRITABLE) { | |
242 | event->write_cb = ctxt; | |
243 | } | |
244 | ldout(cct, 20) << __func__ << " create event end fd=" << fd << " mask=" << mask | |
245 | << " original mask is " << event->mask << dendl; | |
246 | return 0; | |
247 | } | |
248 | ||
249 | void EventCenter::delete_file_event(int fd, int mask) | |
250 | { | |
251 | assert(in_thread() && fd >= 0); | |
252 | if (fd >= nevent) { | |
253 | ldout(cct, 1) << __func__ << " delete event fd=" << fd << " is equal or greater than nevent=" << nevent | |
254 | << "mask=" << mask << dendl; | |
255 | return ; | |
256 | } | |
257 | EventCenter::FileEvent *event = _get_file_event(fd); | |
258 | ldout(cct, 30) << __func__ << " delete event started fd=" << fd << " mask=" << mask | |
259 | << " original mask is " << event->mask << dendl; | |
260 | if (!event->mask) | |
261 | return ; | |
262 | ||
263 | int r = driver->del_event(fd, event->mask, mask); | |
264 | if (r < 0) { | |
265 | // see create_file_event | |
266 | assert(0 == "BUG!"); | |
267 | } | |
268 | ||
269 | if (mask & EVENT_READABLE && event->read_cb) { | |
270 | event->read_cb = nullptr; | |
271 | } | |
272 | if (mask & EVENT_WRITABLE && event->write_cb) { | |
273 | event->write_cb = nullptr; | |
274 | } | |
275 | ||
276 | event->mask = event->mask & (~mask); | |
277 | ldout(cct, 30) << __func__ << " delete event end fd=" << fd << " mask=" << mask | |
278 | << " original mask is " << event->mask << dendl; | |
279 | } | |
280 | ||
281 | uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef ctxt) | |
282 | { | |
283 | assert(in_thread()); | |
284 | uint64_t id = time_event_next_id++; | |
285 | ||
286 | ldout(cct, 30) << __func__ << " id=" << id << " trigger after " << microseconds << "us"<< dendl; | |
287 | EventCenter::TimeEvent event; | |
288 | clock_type::time_point expire = clock_type::now() + std::chrono::microseconds(microseconds); | |
289 | event.id = id; | |
290 | event.time_cb = ctxt; | |
291 | std::multimap<clock_type::time_point, TimeEvent>::value_type s_val(expire, event); | |
292 | auto it = time_events.insert(std::move(s_val)); | |
293 | event_map[id] = it; | |
294 | ||
295 | return id; | |
296 | } | |
297 | ||
298 | void EventCenter::delete_time_event(uint64_t id) | |
299 | { | |
300 | assert(in_thread()); | |
301 | ldout(cct, 30) << __func__ << " id=" << id << dendl; | |
302 | if (id >= time_event_next_id || id == 0) | |
303 | return ; | |
304 | ||
305 | auto it = event_map.find(id); | |
306 | if (it == event_map.end()) { | |
307 | ldout(cct, 10) << __func__ << " id=" << id << " not found" << dendl; | |
308 | return ; | |
309 | } | |
310 | ||
311 | time_events.erase(it->second); | |
312 | event_map.erase(it); | |
313 | } | |
314 | ||
315 | void EventCenter::wakeup() | |
316 | { | |
317 | // No need to wake up since we never sleep | |
318 | if (!pollers.empty() || !driver->need_wakeup()) | |
319 | return ; | |
320 | ||
321 | ldout(cct, 20) << __func__ << dendl; | |
322 | char buf = 'c'; | |
323 | // wake up "event_wait" | |
324 | int n = write(notify_send_fd, &buf, sizeof(buf)); | |
325 | if (n < 0) { | |
326 | if (errno != EAGAIN) { | |
327 | ldout(cct, 1) << __func__ << " write notify pipe failed: " << cpp_strerror(errno) << dendl; | |
328 | ceph_abort(); | |
329 | } | |
330 | } | |
331 | } | |
332 | ||
333 | int EventCenter::process_time_events() | |
334 | { | |
335 | int processed = 0; | |
336 | clock_type::time_point now = clock_type::now(); | |
337 | ldout(cct, 30) << __func__ << " cur time is " << now << dendl; | |
338 | ||
339 | while (!time_events.empty()) { | |
340 | auto it = time_events.begin(); | |
341 | if (now >= it->first) { | |
342 | TimeEvent &e = it->second; | |
343 | EventCallbackRef cb = e.time_cb; | |
344 | uint64_t id = e.id; | |
345 | time_events.erase(it); | |
346 | event_map.erase(id); | |
347 | ldout(cct, 30) << __func__ << " process time event: id=" << id << dendl; | |
348 | processed++; | |
349 | cb->do_request(id); | |
350 | } else { | |
351 | break; | |
352 | } | |
353 | } | |
354 | ||
355 | return processed; | |
356 | } | |
357 | ||
31f18b77 | 358 | int EventCenter::process_events(int timeout_microseconds, ceph::timespan *working_dur) |
7c673cae FG |
359 | { |
360 | struct timeval tv; | |
361 | int numevents; | |
362 | bool trigger_time = false; | |
363 | auto now = clock_type::now(); | |
364 | ||
365 | auto it = time_events.begin(); | |
366 | bool blocking = pollers.empty() && !external_num_events.load(); | |
367 | // If exists external events or poller, don't block | |
368 | if (!blocking) { | |
369 | if (it != time_events.end() && now >= it->first) | |
370 | trigger_time = true; | |
371 | tv.tv_sec = 0; | |
372 | tv.tv_usec = 0; | |
373 | } else { | |
374 | clock_type::time_point shortest; | |
375 | shortest = now + std::chrono::microseconds(timeout_microseconds); | |
376 | ||
377 | if (it != time_events.end() && shortest >= it->first) { | |
378 | ldout(cct, 30) << __func__ << " shortest is " << shortest << " it->first is " << it->first << dendl; | |
379 | shortest = it->first; | |
380 | trigger_time = true; | |
381 | if (shortest > now) { | |
382 | timeout_microseconds = std::chrono::duration_cast<std::chrono::microseconds>( | |
383 | shortest - now).count(); | |
384 | } else { | |
385 | shortest = now; | |
386 | timeout_microseconds = 0; | |
387 | } | |
388 | } | |
389 | tv.tv_sec = timeout_microseconds / 1000000; | |
390 | tv.tv_usec = timeout_microseconds % 1000000; | |
391 | } | |
392 | ||
393 | ldout(cct, 30) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl; | |
394 | vector<FiredFileEvent> fired_events; | |
395 | numevents = driver->event_wait(fired_events, &tv); | |
31f18b77 | 396 | auto working_start = ceph::mono_clock::now(); |
7c673cae FG |
397 | for (int j = 0; j < numevents; j++) { |
398 | int rfired = 0; | |
399 | FileEvent *event; | |
400 | EventCallbackRef cb; | |
401 | event = _get_file_event(fired_events[j].fd); | |
402 | ||
403 | /* note the event->mask & mask & ... code: maybe an already processed | |
404 | * event removed an element that fired and we still didn't | |
405 | * processed, so we check if the event is still valid. */ | |
406 | if (event->mask & fired_events[j].mask & EVENT_READABLE) { | |
407 | rfired = 1; | |
408 | cb = event->read_cb; | |
409 | cb->do_request(fired_events[j].fd); | |
410 | } | |
411 | ||
412 | if (event->mask & fired_events[j].mask & EVENT_WRITABLE) { | |
413 | if (!rfired || event->read_cb != event->write_cb) { | |
414 | cb = event->write_cb; | |
415 | cb->do_request(fired_events[j].fd); | |
416 | } | |
417 | } | |
418 | ||
419 | ldout(cct, 30) << __func__ << " event_wq process is " << fired_events[j].fd << " mask is " << fired_events[j].mask << dendl; | |
420 | } | |
421 | ||
422 | if (trigger_time) | |
423 | numevents += process_time_events(); | |
424 | ||
425 | if (external_num_events.load()) { | |
426 | external_lock.lock(); | |
427 | deque<EventCallbackRef> cur_process; | |
428 | cur_process.swap(external_events); | |
429 | external_num_events.store(0); | |
430 | external_lock.unlock(); | |
431 | while (!cur_process.empty()) { | |
432 | EventCallbackRef e = cur_process.front(); | |
433 | ldout(cct, 30) << __func__ << " do " << e << dendl; | |
434 | e->do_request(0); | |
435 | cur_process.pop_front(); | |
436 | numevents++; | |
437 | } | |
438 | } | |
439 | ||
440 | if (!numevents && !blocking) { | |
441 | for (uint32_t i = 0; i < pollers.size(); i++) | |
442 | numevents += pollers[i]->poll(); | |
443 | } | |
444 | ||
31f18b77 FG |
445 | if (working_dur) |
446 | *working_dur = ceph::mono_clock::now() - working_start; | |
7c673cae FG |
447 | return numevents; |
448 | } | |
449 | ||
450 | void EventCenter::dispatch_event_external(EventCallbackRef e) | |
451 | { | |
452 | external_lock.lock(); | |
453 | external_events.push_back(e); | |
454 | bool wake = !external_num_events.load(); | |
455 | uint64_t num = ++external_num_events; | |
456 | external_lock.unlock(); | |
457 | if (!in_thread() && wake) | |
458 | wakeup(); | |
459 | ||
460 | ldout(cct, 30) << __func__ << " " << e << " pending " << num << dendl; | |
461 | } |