]>
git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/EventKqueue.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #include "common/errno.h"
18 #include "EventKqueue.h"
20 #define dout_subsys ceph_subsys_ms
23 #define dout_prefix *_dout << "KqueueDriver."
25 #define KEVENT_NOWAIT 0
27 int KqueueDriver::test_kqfd() {
29 if (kevent(kqfd
, ke
, 0, NULL
, 0, KEVENT_NOWAIT
) == -1) {
30 ldout(cct
,0) << __func__
<< " invalid kqfd = " << kqfd
31 << cpp_strerror(errno
) << dendl
;
37 int KqueueDriver::restore_events() {
41 ldout(cct
,30) << __func__
<< " on kqfd = " << kqfd
<< dendl
;
44 if (sav_events
[i
].mask
== 0 )
46 ldout(cct
,30) << __func__
<< " restore kqfd = " << kqfd
47 << " fd = " << i
<< " mask " << sav_events
[i
].mask
<< dendl
;
48 if (sav_events
[i
].mask
& EVENT_READABLE
)
49 EV_SET(&ke
[num
++], i
, EVFILT_READ
, EV_ADD
, 0, 0, NULL
);
50 if (sav_events
[i
].mask
& EVENT_WRITABLE
)
51 EV_SET(&ke
[num
++], i
, EVFILT_WRITE
, EV_ADD
, 0, 0, NULL
);
53 if (kevent(kqfd
, ke
, num
, NULL
, 0, KEVENT_NOWAIT
) == -1) {
54 ldout(cct
,0) << __func__
<< " unable to add event: "
55 << cpp_strerror(errno
) << dendl
;
63 int KqueueDriver::test_thread_change(const char* funcname
) {
64 // check to see if we changed thread, because that invalidates
65 // the kqfd and we need to restore that
68 if (!pthread_equal(mythread
, pthread_self())) {
69 ldout(cct
,20) << funcname
<< " We changed thread from " << mythread
70 << " to " << pthread_self() << dendl
;
71 mythread
= pthread_self();
73 } else if ((kqfd
!= -1) && (test_kqfd() < 0)) {
74 // should this ever happen?
75 // It would be strange to change kqfd with thread change.
76 // Might nee to change this into an ceph_assert() in the future.
77 ldout(cct
,0) << funcname
<< " Warning: Recreating old kqfd. "
78 << "This should not happen!!!" << dendl
;
83 ldout(cct
,30) << funcname
<< " kqueue: new kqfd = " << kqfd
84 << " (was: " << oldkqfd
<< ")"
87 lderr(cct
) << funcname
<< " unable to do kqueue: "
88 << cpp_strerror(errno
) << dendl
;
91 if (restore_events()< 0) {
92 lderr(cct
) << funcname
<< " unable restore all events "
93 << cpp_strerror(errno
) << dendl
;
100 int KqueueDriver::init(EventCenter
*c
, int nevent
)
102 // keep track of possible changes of our thread
103 // because change of thread kills the kqfd
104 mythread
= pthread_self();
106 // Reserve the space to accept the kevent return events.
107 res_events
= (struct kevent
*)malloc(sizeof(struct kevent
)*nevent
);
109 lderr(cct
) << __func__
<< " unable to malloc memory: "
110 << cpp_strerror(errno
) << dendl
;
113 memset(res_events
, 0, sizeof(struct kevent
)*nevent
);
116 // Reserve the space to keep all of the events set, so it can be redone
117 // when we change trhread ID.
118 sav_events
= (struct SaveEvent
*)malloc(sizeof(struct SaveEvent
)*nevent
);
120 lderr(cct
) << __func__
<< " unable to malloc memory: "
121 << cpp_strerror(errno
) << dendl
;
124 memset(sav_events
, 0, sizeof(struct SaveEvent
)*nevent
);
127 // Delay assigning a descriptor until it is really needed.
133 int KqueueDriver::add_event(int fd
, int cur_mask
, int add_mask
)
138 ldout(cct
,30) << __func__
<< " add event kqfd = " << kqfd
<< " fd = " << fd
139 << " cur_mask = " << cur_mask
<< " add_mask = " << add_mask
142 int r
= test_thread_change(__func__
);
146 if (add_mask
& EVENT_READABLE
)
147 EV_SET(&ke
[num
++], fd
, EVFILT_READ
, EV_ADD
|EV_CLEAR
, 0, 0, NULL
);
148 if (add_mask
& EVENT_WRITABLE
)
149 EV_SET(&ke
[num
++], fd
, EVFILT_WRITE
, EV_ADD
|EV_CLEAR
, 0, 0, NULL
);
152 if (kevent(kqfd
, ke
, num
, NULL
, 0, KEVENT_NOWAIT
) == -1) {
153 lderr(cct
) << __func__
<< " unable to add event: "
154 << cpp_strerror(errno
) << dendl
;
160 resize_events(sav_max
+5000);
161 sav_events
[fd
].mask
= cur_mask
| add_mask
;
165 int KqueueDriver::del_event(int fd
, int cur_mask
, int del_mask
)
169 int mask
= cur_mask
& del_mask
;
171 ldout(cct
,30) << __func__
<< " delete event kqfd = " << kqfd
172 << " fd = " << fd
<< " cur_mask = " << cur_mask
173 << " del_mask = " << del_mask
<< dendl
;
175 int r
= test_thread_change(__func__
);
179 if (mask
& EVENT_READABLE
)
180 EV_SET(&ke
[num
++], fd
, EVFILT_READ
, EV_DELETE
, 0, 0, NULL
);
181 if (mask
& EVENT_WRITABLE
)
182 EV_SET(&ke
[num
++], fd
, EVFILT_WRITE
, EV_DELETE
, 0, 0, NULL
);
186 if ((r
= kevent(kqfd
, ke
, num
, NULL
, 0, KEVENT_NOWAIT
)) < 0) {
187 lderr(cct
) << __func__
<< " kevent: delete fd=" << fd
<< " mask=" << mask
188 << " failed." << cpp_strerror(errno
) << dendl
;
192 // keep the administration
193 sav_events
[fd
].mask
= cur_mask
& ~del_mask
;
197 int KqueueDriver::resize_events(int newsize
)
199 ldout(cct
,30) << __func__
<< " kqfd = " << kqfd
<< "newsize = " << newsize
201 if (newsize
> sav_max
) {
202 sav_events
= (struct SaveEvent
*)realloc(sav_events
, sizeof(struct SaveEvent
)*newsize
);
204 lderr(cct
) << __func__
<< " unable to realloc memory: "
205 << cpp_strerror(errno
) << dendl
;
206 ceph_assert(sav_events
);
209 memset(&sav_events
[size
], 0, sizeof(struct SaveEvent
)*(newsize
-sav_max
));
215 int KqueueDriver::event_wait(vector
<FiredFileEvent
> &fired_events
, struct timeval
*tvp
)
217 int retval
, numevents
= 0;
218 struct timespec timeout
;
220 ldout(cct
,10) << __func__
<< " kqfd = " << kqfd
<< dendl
;
222 int r
= test_thread_change(__func__
);
227 timeout
.tv_sec
= tvp
->tv_sec
;
228 timeout
.tv_nsec
= tvp
->tv_usec
* 1000;
229 ldout(cct
,20) << __func__
<< " "
230 << timeout
.tv_sec
<< " sec "
231 << timeout
.tv_nsec
<< " nsec"
233 retval
= kevent(kqfd
, NULL
, 0, res_events
, size
, &timeout
);
235 ldout(cct
,30) << __func__
<< " event_wait: " << " NULL" << dendl
;
236 retval
= kevent(kqfd
, NULL
, 0, res_events
, size
, KEVENT_NOWAIT
);
239 ldout(cct
,25) << __func__
<< " kevent retval: " << retval
<< dendl
;
241 lderr(cct
) << __func__
<< " kqueue error: "
242 << cpp_strerror(errno
) << dendl
;
244 } else if (retval
== 0) {
245 ldout(cct
,5) << __func__
<< " Hit timeout("
246 << timeout
.tv_sec
<< " sec "
247 << timeout
.tv_nsec
<< " nsec"
253 fired_events
.resize(numevents
);
254 for (j
= 0; j
< numevents
; j
++) {
256 struct kevent
*e
= res_events
+ j
;
258 if (e
->filter
== EVFILT_READ
) mask
|= EVENT_READABLE
;
259 if (e
->filter
== EVFILT_WRITE
) mask
|= EVENT_WRITABLE
;
260 if (e
->flags
& EV_ERROR
) mask
|= EVENT_READABLE
|EVENT_WRITABLE
;
261 fired_events
[j
].fd
= (int)e
->ident
;
262 fired_events
[j
].mask
= mask
;