1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #ifndef CEPH_MSG_EVENT_H
18 #define CEPH_MSG_EVENT_H
21 #include <AvailabilityMacros.h>
24 // We use epoll, kqueue, evport, select in descending order by performance.
25 #if defined(__linux__)
29 #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
38 #include <sys/feature_tests.h>
39 #ifdef _DTRACE_VERSION
46 #include <condition_variable>
48 #include "common/ceph_time.h"
49 #include "common/dout.h"
50 #include "net_handler.h"
53 #define EVENT_READABLE 1
54 #define EVENT_WRITABLE 2
61 virtual void do_request(uint64_t fd_or_id
) = 0;
62 virtual ~EventCallback() {} // we want a virtual destructor!!!
65 typedef EventCallback
* EventCallbackRef
;
67 struct FiredFileEvent
{
73 * EventDriver is a wrap of event mechanisms depends on different OS.
74 * For example, Linux will use epoll(2), BSD will use kqueue(2) and select will
75 * be used for worst condition.
79 virtual ~EventDriver() {} // we want a virtual destructor!!!
80 virtual int init(EventCenter
*center
, int nevent
) = 0;
81 virtual int add_event(int fd
, int cur_mask
, int mask
) = 0;
82 virtual int del_event(int fd
, int cur_mask
, int del_mask
) = 0;
83 virtual int event_wait(std::vector
<FiredFileEvent
> &fired_events
, struct timeval
*tp
) = 0;
84 virtual int resize_events(int newsize
) = 0;
85 virtual bool need_wakeup() { return true; }
89 * EventCenter maintain a set of file descriptor and handle registered events.
94 static const int MAX_EVENTCENTER
= 24;
97 using clock_type
= ceph::coarse_mono_clock
;
99 struct AssociatedCenters
{
100 EventCenter
*centers
[MAX_EVENTCENTER
];
101 AssociatedCenters() {
102 // FIPS zeroization audit 20191115: this memset is not security related.
103 memset(centers
, 0, MAX_EVENTCENTER
* sizeof(EventCenter
*));
109 EventCallbackRef read_cb
;
110 EventCallbackRef write_cb
;
111 FileEvent(): mask(0), read_cb(NULL
), write_cb(NULL
) {}
116 EventCallbackRef time_cb
;
118 TimeEvent(): id(0), time_cb(NULL
) {}
123 * A Poller object is invoked once each time through the dispatcher's
124 * inner polling loop.
128 explicit Poller(EventCenter
* center
, const std::string
& pollerName
);
132 * This method is defined by a subclass and invoked once by the
133 * center during each pass through its inner polling loop.
136 * 1 means that this poller did useful work during this call.
137 * 0 means that the poller found no work to do.
139 virtual int poll() = 0;
142 /// The EventCenter object that owns this Poller. NULL means the
143 /// EventCenter has been deleted.
146 /// Human-readable string name given to the poller to make it
147 /// easy to identify for debugging. For most pollers just passing
148 /// in the subclass name probably makes sense.
149 std::string poller_name
;
151 /// Index of this Poller in EventCenter::pollers. Allows deletion
152 /// without having to scan all the entries in pollers. -1 means
153 /// this poller isn't currently in EventCenter::pollers (happens
154 /// after EventCenter::reset).
162 // Used only to external event
164 std::mutex external_lock
;
165 std::atomic_ulong external_num_events
;
166 std::deque
<EventCallbackRef
> external_events
;
167 std::vector
<FileEvent
> file_events
;
169 std::multimap
<clock_type::time_point
, TimeEvent
> time_events
;
170 // Keeps track of all of the pollers currently defined. We don't
171 // use an intrusive list here because it isn't reentrant: we need
172 // to add/remove elements while the center is traversing the list.
173 std::vector
<Poller
*> pollers
;
174 std::map
<uint64_t, std::multimap
<clock_type::time_point
, TimeEvent
>::iterator
> event_map
;
175 uint64_t time_event_next_id
;
176 int notify_receive_fd
;
178 ceph::NetHandler net
;
179 EventCallbackRef notify_handler
;
181 AssociatedCenters
*global_centers
= nullptr;
183 int process_time_events();
184 FileEvent
*_get_file_event(int fd
) {
185 ceph_assert(fd
< nevent
);
186 return &file_events
[fd
];
190 explicit EventCenter(CephContext
*c
):
192 external_num_events(0),
193 driver(NULL
), time_event_next_id(1),
194 notify_receive_fd(-1), notify_send_fd(-1), net(c
),
195 notify_handler(NULL
), center_id(0) { }
197 std::ostream
& _event_prefix(std::ostream
*_dout
);
199 int init(int nevent
, unsigned center_id
, const std::string
&type
);
201 pthread_t
get_owner() const { return owner
; }
202 unsigned get_id() const { return center_id
; }
204 EventDriver
*get_driver() { return driver
; }
206 // Used by internal thread
207 int create_file_event(int fd
, int mask
, EventCallbackRef ctxt
);
208 uint64_t create_time_event(uint64_t milliseconds
, EventCallbackRef ctxt
);
209 void delete_file_event(int fd
, int mask
);
210 void delete_time_event(uint64_t id
);
211 int process_events(unsigned timeout_microseconds
, ceph::timespan
*working_dur
= nullptr);
214 // Used by external thread
215 void dispatch_event_external(EventCallbackRef e
);
216 inline bool in_thread() const {
217 return pthread_equal(pthread_self(), owner
);
221 template <typename func
>
222 class C_submit_event
: public EventCallback
{
224 std::condition_variable cond
;
229 C_submit_event(func
&&_f
, bool nowait
)
230 : f(std::move(_f
)), nonwait(nowait
) {}
231 void do_request(uint64_t id
) override
{
242 ceph_assert(!nonwait
);
243 std::unique_lock
<std::mutex
> l(lock
);
250 template <typename func
>
251 void submit_to(int i
, func
&&f
, bool always_async
= false) {
252 ceph_assert(i
< MAX_EVENTCENTER
&& global_centers
);
253 EventCenter
*c
= global_centers
->centers
[i
];
256 C_submit_event
<func
> *event
= new C_submit_event
<func
>(std::move(f
), true);
257 c
->dispatch_event_external(event
);
258 } else if (c
->in_thread()) {
262 C_submit_event
<func
> event(std::move(f
), false);
263 c
->dispatch_event_external(&event
);