1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #ifndef CEPH_MSG_EVENT_H
18 #define CEPH_MSG_EVENT_H
21 #include <AvailabilityMacros.h>
24 // We use epoll, kqueue, evport, select in descending order by performance.
25 #if defined(__linux__)
29 #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
34 #include <sys/feature_tests.h>
35 #ifdef _DTRACE_VERSION
42 #include <condition_variable>
44 #include "common/ceph_time.h"
45 #include "common/dout.h"
46 #include "net_handler.h"
49 #define EVENT_READABLE 1
50 #define EVENT_WRITABLE 2
57 virtual void do_request(int fd_or_id
) = 0;
58 virtual ~EventCallback() {} // we want a virtual destructor!!!
61 typedef EventCallback
* EventCallbackRef
;
63 struct FiredFileEvent
{
69 * EventDriver is a wrap of event mechanisms depends on different OS.
70 * For example, Linux will use epoll(2), BSD will use kqueue(2) and select will
71 * be used for worst condition.
75 virtual ~EventDriver() {} // we want a virtual destructor!!!
76 virtual int init(EventCenter
*center
, int nevent
) = 0;
77 virtual int add_event(int fd
, int cur_mask
, int mask
) = 0;
78 virtual int del_event(int fd
, int cur_mask
, int del_mask
) = 0;
79 virtual int event_wait(vector
<FiredFileEvent
> &fired_events
, struct timeval
*tp
) = 0;
80 virtual int resize_events(int newsize
) = 0;
81 virtual bool need_wakeup() { return true; }
85 * EventCenter maintain a set of file descriptor and handle registered events.
90 static const int MAX_EVENTCENTER
= 24;
93 using clock_type
= ceph::coarse_mono_clock
;
95 struct AssociatedCenters
{
96 EventCenter
*centers
[MAX_EVENTCENTER
];
97 AssociatedCenters(CephContext
*c
) {
98 memset(centers
, 0, MAX_EVENTCENTER
* sizeof(EventCenter
*));
104 EventCallbackRef read_cb
;
105 EventCallbackRef write_cb
;
106 FileEvent(): mask(0), read_cb(NULL
), write_cb(NULL
) {}
111 EventCallbackRef time_cb
;
113 TimeEvent(): id(0), time_cb(NULL
) {}
118 * A Poller object is invoked once each time through the dispatcher's
119 * inner polling loop.
123 explicit Poller(EventCenter
* center
, const string
& pollerName
);
127 * This method is defined by a subclass and invoked once by the
128 * center during each pass through its inner polling loop.
131 * 1 means that this poller did useful work during this call.
132 * 0 means that the poller found no work to do.
134 virtual int poll() = 0;
137 /// The EventCenter object that owns this Poller. NULL means the
138 /// EventCenter has been deleted.
141 /// Human-readable string name given to the poller to make it
142 /// easy to identify for debugging. For most pollers just passing
143 /// in the subclass name probably makes sense.
146 /// Index of this Poller in EventCenter::pollers. Allows deletion
147 /// without having to scan all the entries in pollers. -1 means
148 /// this poller isn't currently in EventCenter::pollers (happens
149 /// after EventCenter::reset).
157 // Used only to external event
159 std::mutex external_lock
;
160 std::atomic_ulong external_num_events
;
161 deque
<EventCallbackRef
> external_events
;
162 vector
<FileEvent
> file_events
;
164 std::multimap
<clock_type::time_point
, TimeEvent
> time_events
;
165 // Keeps track of all of the pollers currently defined. We don't
166 // use an intrusive list here because it isn't reentrant: we need
167 // to add/remove elements while the center is traversing the list.
168 std::vector
<Poller
*> pollers
;
169 std::map
<uint64_t, std::multimap
<clock_type::time_point
, TimeEvent
>::iterator
> event_map
;
170 uint64_t time_event_next_id
;
171 int notify_receive_fd
;
174 EventCallbackRef notify_handler
;
176 AssociatedCenters
*global_centers
= nullptr;
178 int process_time_events();
179 FileEvent
*_get_file_event(int fd
) {
181 return &file_events
[fd
];
185 explicit EventCenter(CephContext
*c
):
187 external_num_events(0),
188 driver(NULL
), time_event_next_id(1),
189 notify_receive_fd(-1), notify_send_fd(-1), net(c
),
190 notify_handler(NULL
), idx(0) { }
192 ostream
& _event_prefix(std::ostream
*_dout
);
194 int init(int nevent
, unsigned idx
, const std::string
&t
);
196 pthread_t
get_owner() const { return owner
; }
197 unsigned get_id() const { return idx
; }
199 EventDriver
*get_driver() { return driver
; }
201 // Used by internal thread
202 int create_file_event(int fd
, int mask
, EventCallbackRef ctxt
);
203 uint64_t create_time_event(uint64_t milliseconds
, EventCallbackRef ctxt
);
204 void delete_file_event(int fd
, int mask
);
205 void delete_time_event(uint64_t id
);
206 int process_events(int timeout_microseconds
);
209 // Used by external thread
210 void dispatch_event_external(EventCallbackRef e
);
211 inline bool in_thread() const {
212 return pthread_equal(pthread_self(), owner
);
216 template <typename func
>
217 class C_submit_event
: public EventCallback
{
219 std::condition_variable cond
;
224 C_submit_event(func
&&_f
, bool nw
)
225 : f(std::move(_f
)), nonwait(nw
) {}
226 void do_request(int id
) override
{
238 std::unique_lock
<std::mutex
> l(lock
);
245 template <typename func
>
246 void submit_to(int i
, func
&&f
, bool nowait
= false) {
247 assert(i
< MAX_EVENTCENTER
&& global_centers
);
248 EventCenter
*c
= global_centers
->centers
[i
];
250 if (!nowait
&& c
->in_thread()) {
255 C_submit_event
<func
> *event
= new C_submit_event
<func
>(std::move(f
), true);
256 c
->dispatch_event_external(event
);
258 C_submit_event
<func
> event(std::move(f
), false);
259 c
->dispatch_event_external(&event
);