]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/Event.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / msg / async / Event.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
91327a77 17#include "include/compat.h"
7c673cae
FG
18#include "common/errno.h"
19#include "Event.h"
20
21#ifdef HAVE_DPDK
22#include "dpdk/EventDPDK.h"
23#endif
24
25#ifdef HAVE_EPOLL
26#include "EventEpoll.h"
27#else
28#ifdef HAVE_KQUEUE
29#include "EventKqueue.h"
30#else
31#include "EventSelect.h"
32#endif
33#endif
34
35#define dout_subsys ceph_subsys_ms
36
37#undef dout_prefix
38#define dout_prefix *_dout << "EventCallback "
39class C_handle_notify : public EventCallback {
40 EventCenter *center;
41 CephContext *cct;
42
43 public:
44 C_handle_notify(EventCenter *c, CephContext *cc): center(c), cct(cc) {}
11fdf7f2 45 void do_request(uint64_t fd_or_id) override {
7c673cae
FG
46 char c[256];
47 int r = 0;
48 do {
49 r = read(fd_or_id, c, sizeof(c));
50 if (r < 0) {
51 if (errno != EAGAIN)
52 ldout(cct, 1) << __func__ << " read notify pipe failed: " << cpp_strerror(errno) << dendl;
53 }
54 } while (r > 0);
55 }
56};
57
58#undef dout_prefix
59#define dout_prefix _event_prefix(_dout)
60
61/**
62 * Construct a Poller.
63 *
64 * \param center
65 * EventCenter object through which the poller will be invoked (defaults
66 * to the global #RAMCloud::center object).
67 * \param pollerName
68 * Human readable name that can be printed out in debugging messages
69 * about the poller. The name of the superclass is probably sufficient
70 * for most cases.
71 */
72EventCenter::Poller::Poller(EventCenter* center, const string& name)
73 : owner(center), poller_name(name), slot(owner->pollers.size())
74{
75 owner->pollers.push_back(this);
76}
77
78/**
79 * Destroy a Poller.
80 */
81EventCenter::Poller::~Poller()
82{
83 // Erase this Poller from the vector by overwriting it with the
84 // poller that used to be the last one in the vector.
85 //
86 // Note: this approach is reentrant (it is safe to delete a
87 // poller from a poller callback, which means that the poll
88 // method is in the middle of scanning the list of all pollers;
89 // the worst that will happen is that the poller that got moved
90 // may not be invoked in the current scan).
91 owner->pollers[slot] = owner->pollers.back();
92 owner->pollers[slot]->slot = slot;
93 owner->pollers.pop_back();
94 slot = -1;
95}
96
97ostream& EventCenter::_event_prefix(std::ostream *_dout)
98{
99 return *_dout << "Event(" << this << " nevent=" << nevent
100 << " time_id=" << time_event_next_id << ").";
101}
102
103int EventCenter::init(int n, unsigned i, const std::string &t)
104{
105 // can't init multi times
11fdf7f2 106 ceph_assert(nevent == 0);
7c673cae
FG
107
108 type = t;
109 idx = i;
110
111 if (t == "dpdk") {
112#ifdef HAVE_DPDK
113 driver = new DPDKDriver(cct);
114#endif
115 } else {
116#ifdef HAVE_EPOLL
117 driver = new EpollDriver(cct);
118#else
119#ifdef HAVE_KQUEUE
120 driver = new KqueueDriver(cct);
121#else
122 driver = new SelectDriver(cct);
123#endif
124#endif
125 }
126
127 if (!driver) {
128 lderr(cct) << __func__ << " failed to create event driver " << dendl;
129 return -1;
130 }
131
132 int r = driver->init(this, n);
133 if (r < 0) {
134 lderr(cct) << __func__ << " failed to init event driver." << dendl;
135 return r;
136 }
137
138 file_events.resize(n);
139 nevent = n;
140
141 if (!driver->need_wakeup())
142 return 0;
143
144 int fds[2];
91327a77
AA
145 if (pipe_cloexec(fds) < 0) {
146 int e = errno;
147 lderr(cct) << __func__ << " can't create notify pipe: " << cpp_strerror(e) << dendl;
148 return -e;
7c673cae
FG
149 }
150
151 notify_receive_fd = fds[0];
152 notify_send_fd = fds[1];
153 r = net.set_nonblock(notify_receive_fd);
154 if (r < 0) {
155 return r;
156 }
157 r = net.set_nonblock(notify_send_fd);
158 if (r < 0) {
159 return r;
160 }
161
162 return r;
163}
164
165EventCenter::~EventCenter()
166{
167 {
168 std::lock_guard<std::mutex> l(external_lock);
169 while (!external_events.empty()) {
170 EventCallbackRef e = external_events.front();
171 if (e)
172 e->do_request(0);
173 external_events.pop_front();
174 }
175 }
11fdf7f2
TL
176 time_events.clear();
177 //assert(time_events.empty());
7c673cae
FG
178
179 if (notify_receive_fd >= 0)
180 ::close(notify_receive_fd);
181 if (notify_send_fd >= 0)
182 ::close(notify_send_fd);
183
184 delete driver;
185 if (notify_handler)
186 delete notify_handler;
187}
188
189
190void EventCenter::set_owner()
191{
192 owner = pthread_self();
193 ldout(cct, 2) << __func__ << " idx=" << idx << " owner=" << owner << dendl;
194 if (!global_centers) {
11fdf7f2
TL
195 global_centers = &cct->lookup_or_create_singleton_object<
196 EventCenter::AssociatedCenters>(
197 "AsyncMessenger::EventCenter::global_center::" + type, true);
198 ceph_assert(global_centers);
7c673cae
FG
199 global_centers->centers[idx] = this;
200 if (driver->need_wakeup()) {
201 notify_handler = new C_handle_notify(this, cct);
202 int r = create_file_event(notify_receive_fd, EVENT_READABLE, notify_handler);
11fdf7f2 203 ceph_assert(r == 0);
7c673cae
FG
204 }
205 }
206}
207
208int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
209{
11fdf7f2 210 ceph_assert(in_thread());
7c673cae
FG
211 int r = 0;
212 if (fd >= nevent) {
213 int new_size = nevent << 2;
94b18763 214 while (fd >= new_size)
7c673cae
FG
215 new_size <<= 2;
216 ldout(cct, 20) << __func__ << " event count exceed " << nevent << ", expand to " << new_size << dendl;
217 r = driver->resize_events(new_size);
218 if (r < 0) {
219 lderr(cct) << __func__ << " event count is exceed." << dendl;
220 return -ERANGE;
221 }
222 file_events.resize(new_size);
223 nevent = new_size;
224 }
225
226 EventCenter::FileEvent *event = _get_file_event(fd);
227 ldout(cct, 20) << __func__ << " create event started fd=" << fd << " mask=" << mask
228 << " original mask is " << event->mask << dendl;
229 if (event->mask == mask)
230 return 0;
231
232 r = driver->add_event(fd, event->mask, mask);
233 if (r < 0) {
234 // Actually we don't allow any failed error code, caller doesn't prepare to
235 // handle error status. So now we need to assert failure here. In practice,
236 // add_event shouldn't report error, otherwise it must be a innermost bug!
11fdf7f2
TL
237 lderr(cct) << __func__ << " add event failed, ret=" << r << " fd=" << fd
238 << " mask=" << mask << " original mask is " << event->mask << dendl;
239 ceph_abort_msg("BUG!");
7c673cae
FG
240 return r;
241 }
242
243 event->mask |= mask;
244 if (mask & EVENT_READABLE) {
245 event->read_cb = ctxt;
246 }
247 if (mask & EVENT_WRITABLE) {
248 event->write_cb = ctxt;
249 }
250 ldout(cct, 20) << __func__ << " create event end fd=" << fd << " mask=" << mask
251 << " original mask is " << event->mask << dendl;
252 return 0;
253}
254
255void EventCenter::delete_file_event(int fd, int mask)
256{
11fdf7f2 257 ceph_assert(in_thread() && fd >= 0);
7c673cae
FG
258 if (fd >= nevent) {
259 ldout(cct, 1) << __func__ << " delete event fd=" << fd << " is equal or greater than nevent=" << nevent
260 << "mask=" << mask << dendl;
261 return ;
262 }
263 EventCenter::FileEvent *event = _get_file_event(fd);
264 ldout(cct, 30) << __func__ << " delete event started fd=" << fd << " mask=" << mask
265 << " original mask is " << event->mask << dendl;
266 if (!event->mask)
267 return ;
268
269 int r = driver->del_event(fd, event->mask, mask);
270 if (r < 0) {
271 // see create_file_event
11fdf7f2 272 ceph_abort_msg("BUG!");
7c673cae
FG
273 }
274
275 if (mask & EVENT_READABLE && event->read_cb) {
276 event->read_cb = nullptr;
277 }
278 if (mask & EVENT_WRITABLE && event->write_cb) {
279 event->write_cb = nullptr;
280 }
281
282 event->mask = event->mask & (~mask);
283 ldout(cct, 30) << __func__ << " delete event end fd=" << fd << " mask=" << mask
284 << " original mask is " << event->mask << dendl;
285}
286
287uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef ctxt)
288{
11fdf7f2 289 ceph_assert(in_thread());
7c673cae
FG
290 uint64_t id = time_event_next_id++;
291
292 ldout(cct, 30) << __func__ << " id=" << id << " trigger after " << microseconds << "us"<< dendl;
293 EventCenter::TimeEvent event;
294 clock_type::time_point expire = clock_type::now() + std::chrono::microseconds(microseconds);
295 event.id = id;
296 event.time_cb = ctxt;
297 std::multimap<clock_type::time_point, TimeEvent>::value_type s_val(expire, event);
298 auto it = time_events.insert(std::move(s_val));
299 event_map[id] = it;
300
301 return id;
302}
303
304void EventCenter::delete_time_event(uint64_t id)
305{
11fdf7f2 306 ceph_assert(in_thread());
7c673cae
FG
307 ldout(cct, 30) << __func__ << " id=" << id << dendl;
308 if (id >= time_event_next_id || id == 0)
309 return ;
310
311 auto it = event_map.find(id);
312 if (it == event_map.end()) {
313 ldout(cct, 10) << __func__ << " id=" << id << " not found" << dendl;
314 return ;
315 }
316
317 time_events.erase(it->second);
318 event_map.erase(it);
319}
320
321void EventCenter::wakeup()
322{
323 // No need to wake up since we never sleep
324 if (!pollers.empty() || !driver->need_wakeup())
325 return ;
326
327 ldout(cct, 20) << __func__ << dendl;
328 char buf = 'c';
329 // wake up "event_wait"
330 int n = write(notify_send_fd, &buf, sizeof(buf));
331 if (n < 0) {
332 if (errno != EAGAIN) {
333 ldout(cct, 1) << __func__ << " write notify pipe failed: " << cpp_strerror(errno) << dendl;
334 ceph_abort();
335 }
336 }
337}
338
339int EventCenter::process_time_events()
340{
341 int processed = 0;
342 clock_type::time_point now = clock_type::now();
343 ldout(cct, 30) << __func__ << " cur time is " << now << dendl;
344
345 while (!time_events.empty()) {
346 auto it = time_events.begin();
347 if (now >= it->first) {
348 TimeEvent &e = it->second;
349 EventCallbackRef cb = e.time_cb;
350 uint64_t id = e.id;
351 time_events.erase(it);
352 event_map.erase(id);
353 ldout(cct, 30) << __func__ << " process time event: id=" << id << dendl;
354 processed++;
355 cb->do_request(id);
356 } else {
357 break;
358 }
359 }
360
361 return processed;
362}
363
11fdf7f2 364int EventCenter::process_events(unsigned timeout_microseconds, ceph::timespan *working_dur)
7c673cae
FG
365{
366 struct timeval tv;
367 int numevents;
368 bool trigger_time = false;
369 auto now = clock_type::now();
370
371 auto it = time_events.begin();
372 bool blocking = pollers.empty() && !external_num_events.load();
373 // If exists external events or poller, don't block
374 if (!blocking) {
375 if (it != time_events.end() && now >= it->first)
376 trigger_time = true;
377 tv.tv_sec = 0;
378 tv.tv_usec = 0;
379 } else {
380 clock_type::time_point shortest;
381 shortest = now + std::chrono::microseconds(timeout_microseconds);
382
383 if (it != time_events.end() && shortest >= it->first) {
384 ldout(cct, 30) << __func__ << " shortest is " << shortest << " it->first is " << it->first << dendl;
385 shortest = it->first;
386 trigger_time = true;
387 if (shortest > now) {
388 timeout_microseconds = std::chrono::duration_cast<std::chrono::microseconds>(
389 shortest - now).count();
390 } else {
391 shortest = now;
392 timeout_microseconds = 0;
393 }
394 }
395 tv.tv_sec = timeout_microseconds / 1000000;
396 tv.tv_usec = timeout_microseconds % 1000000;
397 }
398
399 ldout(cct, 30) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl;
400 vector<FiredFileEvent> fired_events;
401 numevents = driver->event_wait(fired_events, &tv);
31f18b77 402 auto working_start = ceph::mono_clock::now();
7c673cae
FG
403 for (int j = 0; j < numevents; j++) {
404 int rfired = 0;
405 FileEvent *event;
406 EventCallbackRef cb;
407 event = _get_file_event(fired_events[j].fd);
408
409 /* note the event->mask & mask & ... code: maybe an already processed
410 * event removed an element that fired and we still didn't
411 * processed, so we check if the event is still valid. */
412 if (event->mask & fired_events[j].mask & EVENT_READABLE) {
413 rfired = 1;
414 cb = event->read_cb;
415 cb->do_request(fired_events[j].fd);
416 }
417
418 if (event->mask & fired_events[j].mask & EVENT_WRITABLE) {
419 if (!rfired || event->read_cb != event->write_cb) {
420 cb = event->write_cb;
421 cb->do_request(fired_events[j].fd);
422 }
423 }
424
425 ldout(cct, 30) << __func__ << " event_wq process is " << fired_events[j].fd << " mask is " << fired_events[j].mask << dendl;
426 }
427
428 if (trigger_time)
429 numevents += process_time_events();
430
431 if (external_num_events.load()) {
432 external_lock.lock();
433 deque<EventCallbackRef> cur_process;
434 cur_process.swap(external_events);
435 external_num_events.store(0);
436 external_lock.unlock();
11fdf7f2 437 numevents += cur_process.size();
7c673cae
FG
438 while (!cur_process.empty()) {
439 EventCallbackRef e = cur_process.front();
440 ldout(cct, 30) << __func__ << " do " << e << dendl;
441 e->do_request(0);
442 cur_process.pop_front();
7c673cae
FG
443 }
444 }
445
446 if (!numevents && !blocking) {
447 for (uint32_t i = 0; i < pollers.size(); i++)
448 numevents += pollers[i]->poll();
449 }
450
31f18b77
FG
451 if (working_dur)
452 *working_dur = ceph::mono_clock::now() - working_start;
7c673cae
FG
453 return numevents;
454}
455
456void EventCenter::dispatch_event_external(EventCallbackRef e)
457{
11fdf7f2
TL
458 uint64_t num = 0;
459 {
460 std::lock_guard lock{external_lock};
461 if (external_num_events > 0 && *external_events.rbegin() == e) {
462 return;
463 }
464 external_events.push_back(e);
465 num = ++external_num_events;
466 }
467 if (num == 1 && !in_thread())
7c673cae
FG
468 wakeup();
469
470 ldout(cct, 30) << __func__ << " " << e << " pending " << num << dendl;
471}