]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
91327a77 | 17 | #include "include/compat.h" |
7c673cae FG |
18 | #include "common/errno.h" |
19 | #include "Event.h" | |
20 | ||
21 | #ifdef HAVE_DPDK | |
22 | #include "dpdk/EventDPDK.h" | |
23 | #endif | |
24 | ||
25 | #ifdef HAVE_EPOLL | |
26 | #include "EventEpoll.h" | |
27 | #else | |
28 | #ifdef HAVE_KQUEUE | |
29 | #include "EventKqueue.h" | |
30 | #else | |
31 | #include "EventSelect.h" | |
32 | #endif | |
33 | #endif | |
34 | ||
35 | #define dout_subsys ceph_subsys_ms | |
36 | ||
37 | #undef dout_prefix | |
38 | #define dout_prefix *_dout << "EventCallback " | |
39 | class C_handle_notify : public EventCallback { | |
40 | EventCenter *center; | |
41 | CephContext *cct; | |
42 | ||
43 | public: | |
44 | C_handle_notify(EventCenter *c, CephContext *cc): center(c), cct(cc) {} | |
11fdf7f2 | 45 | void do_request(uint64_t fd_or_id) override { |
7c673cae FG |
46 | char c[256]; |
47 | int r = 0; | |
48 | do { | |
49 | r = read(fd_or_id, c, sizeof(c)); | |
50 | if (r < 0) { | |
51 | if (errno != EAGAIN) | |
52 | ldout(cct, 1) << __func__ << " read notify pipe failed: " << cpp_strerror(errno) << dendl; | |
53 | } | |
54 | } while (r > 0); | |
55 | } | |
56 | }; | |
57 | ||
58 | #undef dout_prefix | |
59 | #define dout_prefix _event_prefix(_dout) | |
60 | ||
61 | /** | |
62 | * Construct a Poller. | |
63 | * | |
64 | * \param center | |
65 | * EventCenter object through which the poller will be invoked (defaults | |
66 | * to the global #RAMCloud::center object). | |
67 | * \param pollerName | |
68 | * Human readable name that can be printed out in debugging messages | |
69 | * about the poller. The name of the superclass is probably sufficient | |
70 | * for most cases. | |
71 | */ | |
72 | EventCenter::Poller::Poller(EventCenter* center, const string& name) | |
73 | : owner(center), poller_name(name), slot(owner->pollers.size()) | |
74 | { | |
75 | owner->pollers.push_back(this); | |
76 | } | |
77 | ||
78 | /** | |
79 | * Destroy a Poller. | |
80 | */ | |
81 | EventCenter::Poller::~Poller() | |
82 | { | |
83 | // Erase this Poller from the vector by overwriting it with the | |
84 | // poller that used to be the last one in the vector. | |
85 | // | |
86 | // Note: this approach is reentrant (it is safe to delete a | |
87 | // poller from a poller callback, which means that the poll | |
88 | // method is in the middle of scanning the list of all pollers; | |
89 | // the worst that will happen is that the poller that got moved | |
90 | // may not be invoked in the current scan). | |
91 | owner->pollers[slot] = owner->pollers.back(); | |
92 | owner->pollers[slot]->slot = slot; | |
93 | owner->pollers.pop_back(); | |
94 | slot = -1; | |
95 | } | |
96 | ||
97 | ostream& EventCenter::_event_prefix(std::ostream *_dout) | |
98 | { | |
99 | return *_dout << "Event(" << this << " nevent=" << nevent | |
100 | << " time_id=" << time_event_next_id << ")."; | |
101 | } | |
102 | ||
103 | int EventCenter::init(int n, unsigned i, const std::string &t) | |
104 | { | |
105 | // can't init multi times | |
11fdf7f2 | 106 | ceph_assert(nevent == 0); |
7c673cae FG |
107 | |
108 | type = t; | |
109 | idx = i; | |
110 | ||
111 | if (t == "dpdk") { | |
112 | #ifdef HAVE_DPDK | |
113 | driver = new DPDKDriver(cct); | |
114 | #endif | |
115 | } else { | |
116 | #ifdef HAVE_EPOLL | |
117 | driver = new EpollDriver(cct); | |
118 | #else | |
119 | #ifdef HAVE_KQUEUE | |
120 | driver = new KqueueDriver(cct); | |
121 | #else | |
122 | driver = new SelectDriver(cct); | |
123 | #endif | |
124 | #endif | |
125 | } | |
126 | ||
127 | if (!driver) { | |
128 | lderr(cct) << __func__ << " failed to create event driver " << dendl; | |
129 | return -1; | |
130 | } | |
131 | ||
132 | int r = driver->init(this, n); | |
133 | if (r < 0) { | |
134 | lderr(cct) << __func__ << " failed to init event driver." << dendl; | |
135 | return r; | |
136 | } | |
137 | ||
138 | file_events.resize(n); | |
139 | nevent = n; | |
140 | ||
141 | if (!driver->need_wakeup()) | |
142 | return 0; | |
143 | ||
144 | int fds[2]; | |
91327a77 AA |
145 | if (pipe_cloexec(fds) < 0) { |
146 | int e = errno; | |
147 | lderr(cct) << __func__ << " can't create notify pipe: " << cpp_strerror(e) << dendl; | |
148 | return -e; | |
7c673cae FG |
149 | } |
150 | ||
151 | notify_receive_fd = fds[0]; | |
152 | notify_send_fd = fds[1]; | |
153 | r = net.set_nonblock(notify_receive_fd); | |
154 | if (r < 0) { | |
155 | return r; | |
156 | } | |
157 | r = net.set_nonblock(notify_send_fd); | |
158 | if (r < 0) { | |
159 | return r; | |
160 | } | |
161 | ||
162 | return r; | |
163 | } | |
164 | ||
165 | EventCenter::~EventCenter() | |
166 | { | |
167 | { | |
168 | std::lock_guard<std::mutex> l(external_lock); | |
169 | while (!external_events.empty()) { | |
170 | EventCallbackRef e = external_events.front(); | |
171 | if (e) | |
172 | e->do_request(0); | |
173 | external_events.pop_front(); | |
174 | } | |
175 | } | |
11fdf7f2 TL |
176 | time_events.clear(); |
177 | //assert(time_events.empty()); | |
7c673cae FG |
178 | |
179 | if (notify_receive_fd >= 0) | |
180 | ::close(notify_receive_fd); | |
181 | if (notify_send_fd >= 0) | |
182 | ::close(notify_send_fd); | |
183 | ||
184 | delete driver; | |
185 | if (notify_handler) | |
186 | delete notify_handler; | |
187 | } | |
188 | ||
189 | ||
190 | void EventCenter::set_owner() | |
191 | { | |
192 | owner = pthread_self(); | |
193 | ldout(cct, 2) << __func__ << " idx=" << idx << " owner=" << owner << dendl; | |
194 | if (!global_centers) { | |
11fdf7f2 TL |
195 | global_centers = &cct->lookup_or_create_singleton_object< |
196 | EventCenter::AssociatedCenters>( | |
197 | "AsyncMessenger::EventCenter::global_center::" + type, true); | |
198 | ceph_assert(global_centers); | |
7c673cae FG |
199 | global_centers->centers[idx] = this; |
200 | if (driver->need_wakeup()) { | |
201 | notify_handler = new C_handle_notify(this, cct); | |
202 | int r = create_file_event(notify_receive_fd, EVENT_READABLE, notify_handler); | |
11fdf7f2 | 203 | ceph_assert(r == 0); |
7c673cae FG |
204 | } |
205 | } | |
206 | } | |
207 | ||
208 | int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) | |
209 | { | |
11fdf7f2 | 210 | ceph_assert(in_thread()); |
7c673cae FG |
211 | int r = 0; |
212 | if (fd >= nevent) { | |
213 | int new_size = nevent << 2; | |
94b18763 | 214 | while (fd >= new_size) |
7c673cae FG |
215 | new_size <<= 2; |
216 | ldout(cct, 20) << __func__ << " event count exceed " << nevent << ", expand to " << new_size << dendl; | |
217 | r = driver->resize_events(new_size); | |
218 | if (r < 0) { | |
219 | lderr(cct) << __func__ << " event count is exceed." << dendl; | |
220 | return -ERANGE; | |
221 | } | |
222 | file_events.resize(new_size); | |
223 | nevent = new_size; | |
224 | } | |
225 | ||
226 | EventCenter::FileEvent *event = _get_file_event(fd); | |
227 | ldout(cct, 20) << __func__ << " create event started fd=" << fd << " mask=" << mask | |
228 | << " original mask is " << event->mask << dendl; | |
229 | if (event->mask == mask) | |
230 | return 0; | |
231 | ||
232 | r = driver->add_event(fd, event->mask, mask); | |
233 | if (r < 0) { | |
234 | // Actually we don't allow any failed error code, caller doesn't prepare to | |
235 | // handle error status. So now we need to assert failure here. In practice, | |
236 | // add_event shouldn't report error, otherwise it must be a innermost bug! | |
11fdf7f2 TL |
237 | lderr(cct) << __func__ << " add event failed, ret=" << r << " fd=" << fd |
238 | << " mask=" << mask << " original mask is " << event->mask << dendl; | |
239 | ceph_abort_msg("BUG!"); | |
7c673cae FG |
240 | return r; |
241 | } | |
242 | ||
243 | event->mask |= mask; | |
244 | if (mask & EVENT_READABLE) { | |
245 | event->read_cb = ctxt; | |
246 | } | |
247 | if (mask & EVENT_WRITABLE) { | |
248 | event->write_cb = ctxt; | |
249 | } | |
250 | ldout(cct, 20) << __func__ << " create event end fd=" << fd << " mask=" << mask | |
251 | << " original mask is " << event->mask << dendl; | |
252 | return 0; | |
253 | } | |
254 | ||
255 | void EventCenter::delete_file_event(int fd, int mask) | |
256 | { | |
11fdf7f2 | 257 | ceph_assert(in_thread() && fd >= 0); |
7c673cae FG |
258 | if (fd >= nevent) { |
259 | ldout(cct, 1) << __func__ << " delete event fd=" << fd << " is equal or greater than nevent=" << nevent | |
260 | << "mask=" << mask << dendl; | |
261 | return ; | |
262 | } | |
263 | EventCenter::FileEvent *event = _get_file_event(fd); | |
264 | ldout(cct, 30) << __func__ << " delete event started fd=" << fd << " mask=" << mask | |
265 | << " original mask is " << event->mask << dendl; | |
266 | if (!event->mask) | |
267 | return ; | |
268 | ||
269 | int r = driver->del_event(fd, event->mask, mask); | |
270 | if (r < 0) { | |
271 | // see create_file_event | |
11fdf7f2 | 272 | ceph_abort_msg("BUG!"); |
7c673cae FG |
273 | } |
274 | ||
275 | if (mask & EVENT_READABLE && event->read_cb) { | |
276 | event->read_cb = nullptr; | |
277 | } | |
278 | if (mask & EVENT_WRITABLE && event->write_cb) { | |
279 | event->write_cb = nullptr; | |
280 | } | |
281 | ||
282 | event->mask = event->mask & (~mask); | |
283 | ldout(cct, 30) << __func__ << " delete event end fd=" << fd << " mask=" << mask | |
284 | << " original mask is " << event->mask << dendl; | |
285 | } | |
286 | ||
287 | uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef ctxt) | |
288 | { | |
11fdf7f2 | 289 | ceph_assert(in_thread()); |
7c673cae FG |
290 | uint64_t id = time_event_next_id++; |
291 | ||
292 | ldout(cct, 30) << __func__ << " id=" << id << " trigger after " << microseconds << "us"<< dendl; | |
293 | EventCenter::TimeEvent event; | |
294 | clock_type::time_point expire = clock_type::now() + std::chrono::microseconds(microseconds); | |
295 | event.id = id; | |
296 | event.time_cb = ctxt; | |
297 | std::multimap<clock_type::time_point, TimeEvent>::value_type s_val(expire, event); | |
298 | auto it = time_events.insert(std::move(s_val)); | |
299 | event_map[id] = it; | |
300 | ||
301 | return id; | |
302 | } | |
303 | ||
304 | void EventCenter::delete_time_event(uint64_t id) | |
305 | { | |
11fdf7f2 | 306 | ceph_assert(in_thread()); |
7c673cae FG |
307 | ldout(cct, 30) << __func__ << " id=" << id << dendl; |
308 | if (id >= time_event_next_id || id == 0) | |
309 | return ; | |
310 | ||
311 | auto it = event_map.find(id); | |
312 | if (it == event_map.end()) { | |
313 | ldout(cct, 10) << __func__ << " id=" << id << " not found" << dendl; | |
314 | return ; | |
315 | } | |
316 | ||
317 | time_events.erase(it->second); | |
318 | event_map.erase(it); | |
319 | } | |
320 | ||
321 | void EventCenter::wakeup() | |
322 | { | |
323 | // No need to wake up since we never sleep | |
324 | if (!pollers.empty() || !driver->need_wakeup()) | |
325 | return ; | |
326 | ||
327 | ldout(cct, 20) << __func__ << dendl; | |
328 | char buf = 'c'; | |
329 | // wake up "event_wait" | |
330 | int n = write(notify_send_fd, &buf, sizeof(buf)); | |
331 | if (n < 0) { | |
332 | if (errno != EAGAIN) { | |
333 | ldout(cct, 1) << __func__ << " write notify pipe failed: " << cpp_strerror(errno) << dendl; | |
334 | ceph_abort(); | |
335 | } | |
336 | } | |
337 | } | |
338 | ||
339 | int EventCenter::process_time_events() | |
340 | { | |
341 | int processed = 0; | |
342 | clock_type::time_point now = clock_type::now(); | |
343 | ldout(cct, 30) << __func__ << " cur time is " << now << dendl; | |
344 | ||
345 | while (!time_events.empty()) { | |
346 | auto it = time_events.begin(); | |
347 | if (now >= it->first) { | |
348 | TimeEvent &e = it->second; | |
349 | EventCallbackRef cb = e.time_cb; | |
350 | uint64_t id = e.id; | |
351 | time_events.erase(it); | |
352 | event_map.erase(id); | |
353 | ldout(cct, 30) << __func__ << " process time event: id=" << id << dendl; | |
354 | processed++; | |
355 | cb->do_request(id); | |
356 | } else { | |
357 | break; | |
358 | } | |
359 | } | |
360 | ||
361 | return processed; | |
362 | } | |
363 | ||
11fdf7f2 | 364 | int EventCenter::process_events(unsigned timeout_microseconds, ceph::timespan *working_dur) |
7c673cae FG |
365 | { |
366 | struct timeval tv; | |
367 | int numevents; | |
368 | bool trigger_time = false; | |
369 | auto now = clock_type::now(); | |
370 | ||
371 | auto it = time_events.begin(); | |
372 | bool blocking = pollers.empty() && !external_num_events.load(); | |
373 | // If exists external events or poller, don't block | |
374 | if (!blocking) { | |
375 | if (it != time_events.end() && now >= it->first) | |
376 | trigger_time = true; | |
377 | tv.tv_sec = 0; | |
378 | tv.tv_usec = 0; | |
379 | } else { | |
380 | clock_type::time_point shortest; | |
381 | shortest = now + std::chrono::microseconds(timeout_microseconds); | |
382 | ||
383 | if (it != time_events.end() && shortest >= it->first) { | |
384 | ldout(cct, 30) << __func__ << " shortest is " << shortest << " it->first is " << it->first << dendl; | |
385 | shortest = it->first; | |
386 | trigger_time = true; | |
387 | if (shortest > now) { | |
388 | timeout_microseconds = std::chrono::duration_cast<std::chrono::microseconds>( | |
389 | shortest - now).count(); | |
390 | } else { | |
391 | shortest = now; | |
392 | timeout_microseconds = 0; | |
393 | } | |
394 | } | |
395 | tv.tv_sec = timeout_microseconds / 1000000; | |
396 | tv.tv_usec = timeout_microseconds % 1000000; | |
397 | } | |
398 | ||
399 | ldout(cct, 30) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl; | |
400 | vector<FiredFileEvent> fired_events; | |
401 | numevents = driver->event_wait(fired_events, &tv); | |
31f18b77 | 402 | auto working_start = ceph::mono_clock::now(); |
7c673cae FG |
403 | for (int j = 0; j < numevents; j++) { |
404 | int rfired = 0; | |
405 | FileEvent *event; | |
406 | EventCallbackRef cb; | |
407 | event = _get_file_event(fired_events[j].fd); | |
408 | ||
409 | /* note the event->mask & mask & ... code: maybe an already processed | |
410 | * event removed an element that fired and we still didn't | |
411 | * processed, so we check if the event is still valid. */ | |
412 | if (event->mask & fired_events[j].mask & EVENT_READABLE) { | |
413 | rfired = 1; | |
414 | cb = event->read_cb; | |
415 | cb->do_request(fired_events[j].fd); | |
416 | } | |
417 | ||
418 | if (event->mask & fired_events[j].mask & EVENT_WRITABLE) { | |
419 | if (!rfired || event->read_cb != event->write_cb) { | |
420 | cb = event->write_cb; | |
421 | cb->do_request(fired_events[j].fd); | |
422 | } | |
423 | } | |
424 | ||
425 | ldout(cct, 30) << __func__ << " event_wq process is " << fired_events[j].fd << " mask is " << fired_events[j].mask << dendl; | |
426 | } | |
427 | ||
428 | if (trigger_time) | |
429 | numevents += process_time_events(); | |
430 | ||
431 | if (external_num_events.load()) { | |
432 | external_lock.lock(); | |
433 | deque<EventCallbackRef> cur_process; | |
434 | cur_process.swap(external_events); | |
435 | external_num_events.store(0); | |
436 | external_lock.unlock(); | |
11fdf7f2 | 437 | numevents += cur_process.size(); |
7c673cae FG |
438 | while (!cur_process.empty()) { |
439 | EventCallbackRef e = cur_process.front(); | |
440 | ldout(cct, 30) << __func__ << " do " << e << dendl; | |
441 | e->do_request(0); | |
442 | cur_process.pop_front(); | |
7c673cae FG |
443 | } |
444 | } | |
445 | ||
446 | if (!numevents && !blocking) { | |
447 | for (uint32_t i = 0; i < pollers.size(); i++) | |
448 | numevents += pollers[i]->poll(); | |
449 | } | |
450 | ||
31f18b77 FG |
451 | if (working_dur) |
452 | *working_dur = ceph::mono_clock::now() - working_start; | |
7c673cae FG |
453 | return numevents; |
454 | } | |
455 | ||
456 | void EventCenter::dispatch_event_external(EventCallbackRef e) | |
457 | { | |
11fdf7f2 TL |
458 | uint64_t num = 0; |
459 | { | |
460 | std::lock_guard lock{external_lock}; | |
461 | if (external_num_events > 0 && *external_events.rbegin() == e) { | |
462 | return; | |
463 | } | |
464 | external_events.push_back(e); | |
465 | num = ++external_num_events; | |
466 | } | |
467 | if (num == 1 && !in_thread()) | |
7c673cae FG |
468 | wakeup(); |
469 | ||
470 | ldout(cct, 30) << __func__ << " " << e << " pending " << num << dendl; | |
471 | } |