1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #include "common/errno.h"
21 #include "dpdk/EventDPDK.h"
25 #include "EventEpoll.h"
28 #include "EventKqueue.h"
30 #include "EventSelect.h"
34 #define dout_subsys ceph_subsys_ms
37 #define dout_prefix *_dout << "EventCallback "
38 class C_handle_notify
: public EventCallback
{
43 C_handle_notify(EventCenter
*c
, CephContext
*cc
): center(c
), cct(cc
) {}
44 void do_request(int fd_or_id
) override
{
48 r
= read(fd_or_id
, c
, sizeof(c
));
51 ldout(cct
, 1) << __func__
<< " read notify pipe failed: " << cpp_strerror(errno
) << dendl
;
58 #define dout_prefix _event_prefix(_dout)
64 * EventCenter object through which the poller will be invoked (defaults
65 * to the global #RAMCloud::center object).
67 * Human readable name that can be printed out in debugging messages
68 * about the poller. The name of the superclass is probably sufficient
71 EventCenter::Poller::Poller(EventCenter
* center
, const string
& name
)
72 : owner(center
), poller_name(name
), slot(owner
->pollers
.size())
74 owner
->pollers
.push_back(this);
80 EventCenter::Poller::~Poller()
82 // Erase this Poller from the vector by overwriting it with the
83 // poller that used to be the last one in the vector.
85 // Note: this approach is reentrant (it is safe to delete a
86 // poller from a poller callback, which means that the poll
87 // method is in the middle of scanning the list of all pollers;
88 // the worst that will happen is that the poller that got moved
89 // may not be invoked in the current scan).
90 owner
->pollers
[slot
] = owner
->pollers
.back();
91 owner
->pollers
[slot
]->slot
= slot
;
92 owner
->pollers
.pop_back();
96 ostream
& EventCenter::_event_prefix(std::ostream
*_dout
)
98 return *_dout
<< "Event(" << this << " nevent=" << nevent
99 << " time_id=" << time_event_next_id
<< ").";
102 int EventCenter::init(int n
, unsigned i
, const std::string
&t
)
104 // can't init multi times
112 driver
= new DPDKDriver(cct
);
116 driver
= new EpollDriver(cct
);
119 driver
= new KqueueDriver(cct
);
121 driver
= new SelectDriver(cct
);
127 lderr(cct
) << __func__
<< " failed to create event driver " << dendl
;
131 int r
= driver
->init(this, n
);
133 lderr(cct
) << __func__
<< " failed to init event driver." << dendl
;
137 file_events
.resize(n
);
140 if (!driver
->need_wakeup())
145 lderr(cct
) << __func__
<< " can't create notify pipe" << dendl
;
149 notify_receive_fd
= fds
[0];
150 notify_send_fd
= fds
[1];
151 r
= net
.set_nonblock(notify_receive_fd
);
155 r
= net
.set_nonblock(notify_send_fd
);
163 EventCenter::~EventCenter()
166 std::lock_guard
<std::mutex
> l(external_lock
);
167 while (!external_events
.empty()) {
168 EventCallbackRef e
= external_events
.front();
171 external_events
.pop_front();
174 assert(time_events
.empty());
176 if (notify_receive_fd
>= 0)
177 ::close(notify_receive_fd
);
178 if (notify_send_fd
>= 0)
179 ::close(notify_send_fd
);
183 delete notify_handler
;
187 void EventCenter::set_owner()
189 owner
= pthread_self();
190 ldout(cct
, 2) << __func__
<< " idx=" << idx
<< " owner=" << owner
<< dendl
;
191 if (!global_centers
) {
192 cct
->lookup_or_create_singleton_object
<EventCenter::AssociatedCenters
>(
193 global_centers
, "AsyncMessenger::EventCenter::global_center::"+type
);
194 assert(global_centers
);
195 global_centers
->centers
[idx
] = this;
196 if (driver
->need_wakeup()) {
197 notify_handler
= new C_handle_notify(this, cct
);
198 int r
= create_file_event(notify_receive_fd
, EVENT_READABLE
, notify_handler
);
204 int EventCenter::create_file_event(int fd
, int mask
, EventCallbackRef ctxt
)
209 int new_size
= nevent
<< 2;
210 while (fd
> new_size
)
212 ldout(cct
, 20) << __func__
<< " event count exceed " << nevent
<< ", expand to " << new_size
<< dendl
;
213 r
= driver
->resize_events(new_size
);
215 lderr(cct
) << __func__
<< " event count is exceed." << dendl
;
218 file_events
.resize(new_size
);
222 EventCenter::FileEvent
*event
= _get_file_event(fd
);
223 ldout(cct
, 20) << __func__
<< " create event started fd=" << fd
<< " mask=" << mask
224 << " original mask is " << event
->mask
<< dendl
;
225 if (event
->mask
== mask
)
228 r
= driver
->add_event(fd
, event
->mask
, mask
);
230 // Actually we don't allow any failed error code, caller doesn't prepare to
231 // handle error status. So now we need to assert failure here. In practice,
232 // add_event shouldn't report error, otherwise it must be a innermost bug!
238 if (mask
& EVENT_READABLE
) {
239 event
->read_cb
= ctxt
;
241 if (mask
& EVENT_WRITABLE
) {
242 event
->write_cb
= ctxt
;
244 ldout(cct
, 20) << __func__
<< " create event end fd=" << fd
<< " mask=" << mask
245 << " original mask is " << event
->mask
<< dendl
;
249 void EventCenter::delete_file_event(int fd
, int mask
)
251 assert(in_thread() && fd
>= 0);
253 ldout(cct
, 1) << __func__
<< " delete event fd=" << fd
<< " is equal or greater than nevent=" << nevent
254 << "mask=" << mask
<< dendl
;
257 EventCenter::FileEvent
*event
= _get_file_event(fd
);
258 ldout(cct
, 30) << __func__
<< " delete event started fd=" << fd
<< " mask=" << mask
259 << " original mask is " << event
->mask
<< dendl
;
263 int r
= driver
->del_event(fd
, event
->mask
, mask
);
265 // see create_file_event
269 if (mask
& EVENT_READABLE
&& event
->read_cb
) {
270 event
->read_cb
= nullptr;
272 if (mask
& EVENT_WRITABLE
&& event
->write_cb
) {
273 event
->write_cb
= nullptr;
276 event
->mask
= event
->mask
& (~mask
);
277 ldout(cct
, 30) << __func__
<< " delete event end fd=" << fd
<< " mask=" << mask
278 << " original mask is " << event
->mask
<< dendl
;
281 uint64_t EventCenter::create_time_event(uint64_t microseconds
, EventCallbackRef ctxt
)
284 uint64_t id
= time_event_next_id
++;
286 ldout(cct
, 30) << __func__
<< " id=" << id
<< " trigger after " << microseconds
<< "us"<< dendl
;
287 EventCenter::TimeEvent event
;
288 clock_type::time_point expire
= clock_type::now() + std::chrono::microseconds(microseconds
);
290 event
.time_cb
= ctxt
;
291 std::multimap
<clock_type::time_point
, TimeEvent
>::value_type
s_val(expire
, event
);
292 auto it
= time_events
.insert(std::move(s_val
));
298 void EventCenter::delete_time_event(uint64_t id
)
301 ldout(cct
, 30) << __func__
<< " id=" << id
<< dendl
;
302 if (id
>= time_event_next_id
|| id
== 0)
305 auto it
= event_map
.find(id
);
306 if (it
== event_map
.end()) {
307 ldout(cct
, 10) << __func__
<< " id=" << id
<< " not found" << dendl
;
311 time_events
.erase(it
->second
);
315 void EventCenter::wakeup()
317 // No need to wake up since we never sleep
318 if (!pollers
.empty() || !driver
->need_wakeup())
321 ldout(cct
, 20) << __func__
<< dendl
;
323 // wake up "event_wait"
324 int n
= write(notify_send_fd
, &buf
, sizeof(buf
));
326 if (errno
!= EAGAIN
) {
327 ldout(cct
, 1) << __func__
<< " write notify pipe failed: " << cpp_strerror(errno
) << dendl
;
333 int EventCenter::process_time_events()
336 clock_type::time_point now
= clock_type::now();
337 ldout(cct
, 30) << __func__
<< " cur time is " << now
<< dendl
;
339 while (!time_events
.empty()) {
340 auto it
= time_events
.begin();
341 if (now
>= it
->first
) {
342 TimeEvent
&e
= it
->second
;
343 EventCallbackRef cb
= e
.time_cb
;
345 time_events
.erase(it
);
347 ldout(cct
, 30) << __func__
<< " process time event: id=" << id
<< dendl
;
358 int EventCenter::process_events(int timeout_microseconds
)
362 bool trigger_time
= false;
363 auto now
= clock_type::now();
365 auto it
= time_events
.begin();
366 bool blocking
= pollers
.empty() && !external_num_events
.load();
367 // If exists external events or poller, don't block
369 if (it
!= time_events
.end() && now
>= it
->first
)
374 clock_type::time_point shortest
;
375 shortest
= now
+ std::chrono::microseconds(timeout_microseconds
);
377 if (it
!= time_events
.end() && shortest
>= it
->first
) {
378 ldout(cct
, 30) << __func__
<< " shortest is " << shortest
<< " it->first is " << it
->first
<< dendl
;
379 shortest
= it
->first
;
381 if (shortest
> now
) {
382 timeout_microseconds
= std::chrono::duration_cast
<std::chrono::microseconds
>(
383 shortest
- now
).count();
386 timeout_microseconds
= 0;
389 tv
.tv_sec
= timeout_microseconds
/ 1000000;
390 tv
.tv_usec
= timeout_microseconds
% 1000000;
393 ldout(cct
, 30) << __func__
<< " wait second " << tv
.tv_sec
<< " usec " << tv
.tv_usec
<< dendl
;
394 vector
<FiredFileEvent
> fired_events
;
395 numevents
= driver
->event_wait(fired_events
, &tv
);
396 for (int j
= 0; j
< numevents
; j
++) {
400 event
= _get_file_event(fired_events
[j
].fd
);
402 /* note the event->mask & mask & ... code: maybe an already processed
403 * event removed an element that fired and we still didn't
404 * processed, so we check if the event is still valid. */
405 if (event
->mask
& fired_events
[j
].mask
& EVENT_READABLE
) {
408 cb
->do_request(fired_events
[j
].fd
);
411 if (event
->mask
& fired_events
[j
].mask
& EVENT_WRITABLE
) {
412 if (!rfired
|| event
->read_cb
!= event
->write_cb
) {
413 cb
= event
->write_cb
;
414 cb
->do_request(fired_events
[j
].fd
);
418 ldout(cct
, 30) << __func__
<< " event_wq process is " << fired_events
[j
].fd
<< " mask is " << fired_events
[j
].mask
<< dendl
;
422 numevents
+= process_time_events();
424 if (external_num_events
.load()) {
425 external_lock
.lock();
426 deque
<EventCallbackRef
> cur_process
;
427 cur_process
.swap(external_events
);
428 external_num_events
.store(0);
429 external_lock
.unlock();
430 while (!cur_process
.empty()) {
431 EventCallbackRef e
= cur_process
.front();
432 ldout(cct
, 30) << __func__
<< " do " << e
<< dendl
;
434 cur_process
.pop_front();
439 if (!numevents
&& !blocking
) {
440 for (uint32_t i
= 0; i
< pollers
.size(); i
++)
441 numevents
+= pollers
[i
]->poll();
447 void EventCenter::dispatch_event_external(EventCallbackRef e
)
449 external_lock
.lock();
450 external_events
.push_back(e
);
451 bool wake
= !external_num_events
.load();
452 uint64_t num
= ++external_num_events
;
453 external_lock
.unlock();
454 if (!in_thread() && wake
)
457 ldout(cct
, 30) << __func__
<< " " << e
<< " pending " << num
<< dendl
;