1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #ifndef CEPH_MSG_EVENT_H
18 #define CEPH_MSG_EVENT_H
21 #include <AvailabilityMacros.h>
24 // We use epoll, kqueue, evport, select in descending order by performance.
25 #if defined(__linux__)
29 #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
34 #include <sys/feature_tests.h>
35 #ifdef _DTRACE_VERSION
42 #include <condition_variable>
44 #include "common/ceph_time.h"
45 #include "common/dout.h"
46 #include "net_handler.h"
49 #define EVENT_READABLE 1
50 #define EVENT_WRITABLE 2
57 virtual void do_request(uint64_t fd_or_id
) = 0;
58 virtual ~EventCallback() {} // we want a virtual destructor!!!
61 typedef EventCallback
* EventCallbackRef
;
63 struct FiredFileEvent
{
69 * EventDriver is a wrap of event mechanisms depends on different OS.
70 * For example, Linux will use epoll(2), BSD will use kqueue(2) and select will
71 * be used for worst condition.
75 virtual ~EventDriver() {} // we want a virtual destructor!!!
76 virtual int init(EventCenter
*center
, int nevent
) = 0;
77 virtual int add_event(int fd
, int cur_mask
, int mask
) = 0;
78 virtual int del_event(int fd
, int cur_mask
, int del_mask
) = 0;
79 virtual int event_wait(vector
<FiredFileEvent
> &fired_events
, struct timeval
*tp
) = 0;
80 virtual int resize_events(int newsize
) = 0;
81 virtual bool need_wakeup() { return true; }
85 * EventCenter maintain a set of file descriptor and handle registered events.
90 static const int MAX_EVENTCENTER
= 24;
93 using clock_type
= ceph::coarse_mono_clock
;
95 struct AssociatedCenters
{
96 EventCenter
*centers
[MAX_EVENTCENTER
];
98 // FIPS zeroization audit 20191115: this memset is not security related.
99 memset(centers
, 0, MAX_EVENTCENTER
* sizeof(EventCenter
*));
105 EventCallbackRef read_cb
;
106 EventCallbackRef write_cb
;
107 FileEvent(): mask(0), read_cb(NULL
), write_cb(NULL
) {}
112 EventCallbackRef time_cb
;
114 TimeEvent(): id(0), time_cb(NULL
) {}
119 * A Poller object is invoked once each time through the dispatcher's
120 * inner polling loop.
124 explicit Poller(EventCenter
* center
, const string
& pollerName
);
128 * This method is defined by a subclass and invoked once by the
129 * center during each pass through its inner polling loop.
132 * 1 means that this poller did useful work during this call.
133 * 0 means that the poller found no work to do.
135 virtual int poll() = 0;
138 /// The EventCenter object that owns this Poller. NULL means the
139 /// EventCenter has been deleted.
142 /// Human-readable string name given to the poller to make it
143 /// easy to identify for debugging. For most pollers just passing
144 /// in the subclass name probably makes sense.
147 /// Index of this Poller in EventCenter::pollers. Allows deletion
148 /// without having to scan all the entries in pollers. -1 means
149 /// this poller isn't currently in EventCenter::pollers (happens
150 /// after EventCenter::reset).
158 // Used only to external event
160 std::mutex external_lock
;
161 std::atomic_ulong external_num_events
;
162 deque
<EventCallbackRef
> external_events
;
163 vector
<FileEvent
> file_events
;
165 std::multimap
<clock_type::time_point
, TimeEvent
> time_events
;
166 // Keeps track of all of the pollers currently defined. We don't
167 // use an intrusive list here because it isn't reentrant: we need
168 // to add/remove elements while the center is traversing the list.
169 std::vector
<Poller
*> pollers
;
170 std::map
<uint64_t, std::multimap
<clock_type::time_point
, TimeEvent
>::iterator
> event_map
;
171 uint64_t time_event_next_id
;
172 int notify_receive_fd
;
175 EventCallbackRef notify_handler
;
177 AssociatedCenters
*global_centers
= nullptr;
179 int process_time_events();
180 FileEvent
*_get_file_event(int fd
) {
181 ceph_assert(fd
< nevent
);
182 return &file_events
[fd
];
186 explicit EventCenter(CephContext
*c
):
188 external_num_events(0),
189 driver(NULL
), time_event_next_id(1),
190 notify_receive_fd(-1), notify_send_fd(-1), net(c
),
191 notify_handler(NULL
), idx(0) { }
193 ostream
& _event_prefix(std::ostream
*_dout
);
195 int init(int nevent
, unsigned idx
, const std::string
&t
);
197 pthread_t
get_owner() const { return owner
; }
198 unsigned get_id() const { return idx
; }
200 EventDriver
*get_driver() { return driver
; }
202 // Used by internal thread
203 int create_file_event(int fd
, int mask
, EventCallbackRef ctxt
);
204 uint64_t create_time_event(uint64_t milliseconds
, EventCallbackRef ctxt
);
205 void delete_file_event(int fd
, int mask
);
206 void delete_time_event(uint64_t id
);
207 int process_events(unsigned timeout_microseconds
, ceph::timespan
*working_dur
= nullptr);
210 // Used by external thread
211 void dispatch_event_external(EventCallbackRef e
);
212 inline bool in_thread() const {
213 return pthread_equal(pthread_self(), owner
);
217 template <typename func
>
218 class C_submit_event
: public EventCallback
{
220 std::condition_variable cond
;
225 C_submit_event(func
&&_f
, bool nw
)
226 : f(std::move(_f
)), nonwait(nw
) {}
227 void do_request(uint64_t id
) override
{
238 ceph_assert(!nonwait
);
239 std::unique_lock
<std::mutex
> l(lock
);
246 template <typename func
>
247 void submit_to(int i
, func
&&f
, bool nowait
= false) {
248 ceph_assert(i
< MAX_EVENTCENTER
&& global_centers
);
249 EventCenter
*c
= global_centers
->centers
[i
];
251 if (!nowait
&& c
->in_thread()) {
256 C_submit_event
<func
> *event
= new C_submit_event
<func
>(std::move(f
), true);
257 c
->dispatch_event_external(event
);
259 C_submit_event
<func
> event(std::move(f
), false);
260 c
->dispatch_event_external(&event
);