]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/Event.cc
update sources to v12.1.0
[ceph.git] / ceph / src / msg / async / Event.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#include "common/errno.h"
18#include "Event.h"
19
20#ifdef HAVE_DPDK
21#include "dpdk/EventDPDK.h"
22#endif
23
24#ifdef HAVE_EPOLL
25#include "EventEpoll.h"
26#else
27#ifdef HAVE_KQUEUE
28#include "EventKqueue.h"
29#else
30#include "EventSelect.h"
31#endif
32#endif
33
34#define dout_subsys ceph_subsys_ms
35
36#undef dout_prefix
37#define dout_prefix *_dout << "EventCallback "
38class C_handle_notify : public EventCallback {
39 EventCenter *center;
40 CephContext *cct;
41
42 public:
43 C_handle_notify(EventCenter *c, CephContext *cc): center(c), cct(cc) {}
44 void do_request(int fd_or_id) override {
45 char c[256];
46 int r = 0;
47 do {
48 r = read(fd_or_id, c, sizeof(c));
49 if (r < 0) {
50 if (errno != EAGAIN)
51 ldout(cct, 1) << __func__ << " read notify pipe failed: " << cpp_strerror(errno) << dendl;
52 }
53 } while (r > 0);
54 }
55};
56
57#undef dout_prefix
58#define dout_prefix _event_prefix(_dout)
59
60/**
61 * Construct a Poller.
62 *
63 * \param center
64 * EventCenter object through which the poller will be invoked (defaults
65 * to the global #RAMCloud::center object).
66 * \param pollerName
67 * Human readable name that can be printed out in debugging messages
68 * about the poller. The name of the superclass is probably sufficient
69 * for most cases.
70 */
71EventCenter::Poller::Poller(EventCenter* center, const string& name)
72 : owner(center), poller_name(name), slot(owner->pollers.size())
73{
74 owner->pollers.push_back(this);
75}
76
77/**
78 * Destroy a Poller.
79 */
80EventCenter::Poller::~Poller()
81{
82 // Erase this Poller from the vector by overwriting it with the
83 // poller that used to be the last one in the vector.
84 //
85 // Note: this approach is reentrant (it is safe to delete a
86 // poller from a poller callback, which means that the poll
87 // method is in the middle of scanning the list of all pollers;
88 // the worst that will happen is that the poller that got moved
89 // may not be invoked in the current scan).
90 owner->pollers[slot] = owner->pollers.back();
91 owner->pollers[slot]->slot = slot;
92 owner->pollers.pop_back();
93 slot = -1;
94}
95
96ostream& EventCenter::_event_prefix(std::ostream *_dout)
97{
98 return *_dout << "Event(" << this << " nevent=" << nevent
99 << " time_id=" << time_event_next_id << ").";
100}
101
102int EventCenter::init(int n, unsigned i, const std::string &t)
103{
104 // can't init multi times
105 assert(nevent == 0);
106
107 type = t;
108 idx = i;
109
110 if (t == "dpdk") {
111#ifdef HAVE_DPDK
112 driver = new DPDKDriver(cct);
113#endif
114 } else {
115#ifdef HAVE_EPOLL
116 driver = new EpollDriver(cct);
117#else
118#ifdef HAVE_KQUEUE
119 driver = new KqueueDriver(cct);
120#else
121 driver = new SelectDriver(cct);
122#endif
123#endif
124 }
125
126 if (!driver) {
127 lderr(cct) << __func__ << " failed to create event driver " << dendl;
128 return -1;
129 }
130
131 int r = driver->init(this, n);
132 if (r < 0) {
133 lderr(cct) << __func__ << " failed to init event driver." << dendl;
134 return r;
135 }
136
137 file_events.resize(n);
138 nevent = n;
139
140 if (!driver->need_wakeup())
141 return 0;
142
143 int fds[2];
144 if (pipe(fds) < 0) {
145 lderr(cct) << __func__ << " can't create notify pipe" << dendl;
146 return -errno;
147 }
148
149 notify_receive_fd = fds[0];
150 notify_send_fd = fds[1];
151 r = net.set_nonblock(notify_receive_fd);
152 if (r < 0) {
153 return r;
154 }
155 r = net.set_nonblock(notify_send_fd);
156 if (r < 0) {
157 return r;
158 }
159
160 return r;
161}
162
163EventCenter::~EventCenter()
164{
165 {
166 std::lock_guard<std::mutex> l(external_lock);
167 while (!external_events.empty()) {
168 EventCallbackRef e = external_events.front();
169 if (e)
170 e->do_request(0);
171 external_events.pop_front();
172 }
173 }
174 assert(time_events.empty());
175
176 if (notify_receive_fd >= 0)
177 ::close(notify_receive_fd);
178 if (notify_send_fd >= 0)
179 ::close(notify_send_fd);
180
181 delete driver;
182 if (notify_handler)
183 delete notify_handler;
184}
185
186
187void EventCenter::set_owner()
188{
189 owner = pthread_self();
190 ldout(cct, 2) << __func__ << " idx=" << idx << " owner=" << owner << dendl;
191 if (!global_centers) {
192 cct->lookup_or_create_singleton_object<EventCenter::AssociatedCenters>(
193 global_centers, "AsyncMessenger::EventCenter::global_center::"+type);
194 assert(global_centers);
195 global_centers->centers[idx] = this;
196 if (driver->need_wakeup()) {
197 notify_handler = new C_handle_notify(this, cct);
198 int r = create_file_event(notify_receive_fd, EVENT_READABLE, notify_handler);
199 assert(r == 0);
200 }
201 }
202}
203
204int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
205{
206 assert(in_thread());
207 int r = 0;
208 if (fd >= nevent) {
209 int new_size = nevent << 2;
210 while (fd > new_size)
211 new_size <<= 2;
212 ldout(cct, 20) << __func__ << " event count exceed " << nevent << ", expand to " << new_size << dendl;
213 r = driver->resize_events(new_size);
214 if (r < 0) {
215 lderr(cct) << __func__ << " event count is exceed." << dendl;
216 return -ERANGE;
217 }
218 file_events.resize(new_size);
219 nevent = new_size;
220 }
221
222 EventCenter::FileEvent *event = _get_file_event(fd);
223 ldout(cct, 20) << __func__ << " create event started fd=" << fd << " mask=" << mask
224 << " original mask is " << event->mask << dendl;
225 if (event->mask == mask)
226 return 0;
227
228 r = driver->add_event(fd, event->mask, mask);
229 if (r < 0) {
230 // Actually we don't allow any failed error code, caller doesn't prepare to
231 // handle error status. So now we need to assert failure here. In practice,
232 // add_event shouldn't report error, otherwise it must be a innermost bug!
233 assert(0 == "BUG!");
234 return r;
235 }
236
237 event->mask |= mask;
238 if (mask & EVENT_READABLE) {
239 event->read_cb = ctxt;
240 }
241 if (mask & EVENT_WRITABLE) {
242 event->write_cb = ctxt;
243 }
244 ldout(cct, 20) << __func__ << " create event end fd=" << fd << " mask=" << mask
245 << " original mask is " << event->mask << dendl;
246 return 0;
247}
248
249void EventCenter::delete_file_event(int fd, int mask)
250{
251 assert(in_thread() && fd >= 0);
252 if (fd >= nevent) {
253 ldout(cct, 1) << __func__ << " delete event fd=" << fd << " is equal or greater than nevent=" << nevent
254 << "mask=" << mask << dendl;
255 return ;
256 }
257 EventCenter::FileEvent *event = _get_file_event(fd);
258 ldout(cct, 30) << __func__ << " delete event started fd=" << fd << " mask=" << mask
259 << " original mask is " << event->mask << dendl;
260 if (!event->mask)
261 return ;
262
263 int r = driver->del_event(fd, event->mask, mask);
264 if (r < 0) {
265 // see create_file_event
266 assert(0 == "BUG!");
267 }
268
269 if (mask & EVENT_READABLE && event->read_cb) {
270 event->read_cb = nullptr;
271 }
272 if (mask & EVENT_WRITABLE && event->write_cb) {
273 event->write_cb = nullptr;
274 }
275
276 event->mask = event->mask & (~mask);
277 ldout(cct, 30) << __func__ << " delete event end fd=" << fd << " mask=" << mask
278 << " original mask is " << event->mask << dendl;
279}
280
281uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef ctxt)
282{
283 assert(in_thread());
284 uint64_t id = time_event_next_id++;
285
286 ldout(cct, 30) << __func__ << " id=" << id << " trigger after " << microseconds << "us"<< dendl;
287 EventCenter::TimeEvent event;
288 clock_type::time_point expire = clock_type::now() + std::chrono::microseconds(microseconds);
289 event.id = id;
290 event.time_cb = ctxt;
291 std::multimap<clock_type::time_point, TimeEvent>::value_type s_val(expire, event);
292 auto it = time_events.insert(std::move(s_val));
293 event_map[id] = it;
294
295 return id;
296}
297
298void EventCenter::delete_time_event(uint64_t id)
299{
300 assert(in_thread());
301 ldout(cct, 30) << __func__ << " id=" << id << dendl;
302 if (id >= time_event_next_id || id == 0)
303 return ;
304
305 auto it = event_map.find(id);
306 if (it == event_map.end()) {
307 ldout(cct, 10) << __func__ << " id=" << id << " not found" << dendl;
308 return ;
309 }
310
311 time_events.erase(it->second);
312 event_map.erase(it);
313}
314
315void EventCenter::wakeup()
316{
317 // No need to wake up since we never sleep
318 if (!pollers.empty() || !driver->need_wakeup())
319 return ;
320
321 ldout(cct, 20) << __func__ << dendl;
322 char buf = 'c';
323 // wake up "event_wait"
324 int n = write(notify_send_fd, &buf, sizeof(buf));
325 if (n < 0) {
326 if (errno != EAGAIN) {
327 ldout(cct, 1) << __func__ << " write notify pipe failed: " << cpp_strerror(errno) << dendl;
328 ceph_abort();
329 }
330 }
331}
332
333int EventCenter::process_time_events()
334{
335 int processed = 0;
336 clock_type::time_point now = clock_type::now();
337 ldout(cct, 30) << __func__ << " cur time is " << now << dendl;
338
339 while (!time_events.empty()) {
340 auto it = time_events.begin();
341 if (now >= it->first) {
342 TimeEvent &e = it->second;
343 EventCallbackRef cb = e.time_cb;
344 uint64_t id = e.id;
345 time_events.erase(it);
346 event_map.erase(id);
347 ldout(cct, 30) << __func__ << " process time event: id=" << id << dendl;
348 processed++;
349 cb->do_request(id);
350 } else {
351 break;
352 }
353 }
354
355 return processed;
356}
357
31f18b77 358int EventCenter::process_events(int timeout_microseconds, ceph::timespan *working_dur)
7c673cae
FG
359{
360 struct timeval tv;
361 int numevents;
362 bool trigger_time = false;
363 auto now = clock_type::now();
364
365 auto it = time_events.begin();
366 bool blocking = pollers.empty() && !external_num_events.load();
367 // If exists external events or poller, don't block
368 if (!blocking) {
369 if (it != time_events.end() && now >= it->first)
370 trigger_time = true;
371 tv.tv_sec = 0;
372 tv.tv_usec = 0;
373 } else {
374 clock_type::time_point shortest;
375 shortest = now + std::chrono::microseconds(timeout_microseconds);
376
377 if (it != time_events.end() && shortest >= it->first) {
378 ldout(cct, 30) << __func__ << " shortest is " << shortest << " it->first is " << it->first << dendl;
379 shortest = it->first;
380 trigger_time = true;
381 if (shortest > now) {
382 timeout_microseconds = std::chrono::duration_cast<std::chrono::microseconds>(
383 shortest - now).count();
384 } else {
385 shortest = now;
386 timeout_microseconds = 0;
387 }
388 }
389 tv.tv_sec = timeout_microseconds / 1000000;
390 tv.tv_usec = timeout_microseconds % 1000000;
391 }
392
393 ldout(cct, 30) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl;
394 vector<FiredFileEvent> fired_events;
395 numevents = driver->event_wait(fired_events, &tv);
31f18b77 396 auto working_start = ceph::mono_clock::now();
7c673cae
FG
397 for (int j = 0; j < numevents; j++) {
398 int rfired = 0;
399 FileEvent *event;
400 EventCallbackRef cb;
401 event = _get_file_event(fired_events[j].fd);
402
403 /* note the event->mask & mask & ... code: maybe an already processed
404 * event removed an element that fired and we still didn't
405 * processed, so we check if the event is still valid. */
406 if (event->mask & fired_events[j].mask & EVENT_READABLE) {
407 rfired = 1;
408 cb = event->read_cb;
409 cb->do_request(fired_events[j].fd);
410 }
411
412 if (event->mask & fired_events[j].mask & EVENT_WRITABLE) {
413 if (!rfired || event->read_cb != event->write_cb) {
414 cb = event->write_cb;
415 cb->do_request(fired_events[j].fd);
416 }
417 }
418
419 ldout(cct, 30) << __func__ << " event_wq process is " << fired_events[j].fd << " mask is " << fired_events[j].mask << dendl;
420 }
421
422 if (trigger_time)
423 numevents += process_time_events();
424
425 if (external_num_events.load()) {
426 external_lock.lock();
427 deque<EventCallbackRef> cur_process;
428 cur_process.swap(external_events);
429 external_num_events.store(0);
430 external_lock.unlock();
431 while (!cur_process.empty()) {
432 EventCallbackRef e = cur_process.front();
433 ldout(cct, 30) << __func__ << " do " << e << dendl;
434 e->do_request(0);
435 cur_process.pop_front();
436 numevents++;
437 }
438 }
439
440 if (!numevents && !blocking) {
441 for (uint32_t i = 0; i < pollers.size(); i++)
442 numevents += pollers[i]->poll();
443 }
444
31f18b77
FG
445 if (working_dur)
446 *working_dur = ceph::mono_clock::now() - working_start;
7c673cae
FG
447 return numevents;
448}
449
450void EventCenter::dispatch_event_external(EventCallbackRef e)
451{
452 external_lock.lock();
453 external_events.push_back(e);
454 bool wake = !external_num_events.load();
455 uint64_t num = ++external_num_events;
456 external_lock.unlock();
457 if (!in_thread() && wake)
458 wakeup();
459
460 ldout(cct, 30) << __func__ << " " << e << " pending " << num << dendl;
461}