]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #include "common/errno.h" | |
18 | #include "EventKqueue.h" | |
19 | ||
20 | #define dout_subsys ceph_subsys_ms | |
21 | ||
22 | #undef dout_prefix | |
23 | #define dout_prefix *_dout << "KqueueDriver." | |
24 | ||
25 | #define KEVENT_NOWAIT 0 | |
26 | ||
27 | int KqueueDriver::test_kqfd() { | |
28 | struct kevent ke[1]; | |
29 | if (kevent(kqfd, ke, 0, NULL, 0, KEVENT_NOWAIT) == -1) { | |
30 | ldout(cct,0) << __func__ << " invalid kqfd = " << kqfd | |
31 | << cpp_strerror(errno) << dendl; | |
32 | return -errno; | |
33 | } | |
34 | return kqfd; | |
35 | } | |
36 | ||
37 | int KqueueDriver::restore_events() { | |
38 | struct kevent ke[2]; | |
39 | int i; | |
40 | ||
41 | ldout(cct,30) << __func__ << " on kqfd = " << kqfd << dendl; | |
42 | for(i=0;i<size;i++) { | |
43 | int num = 0; | |
44 | if (sav_events[i].mask == 0 ) | |
45 | continue; | |
46 | ldout(cct,30) << __func__ << " restore kqfd = " << kqfd | |
47 | << " fd = " << i << " mask " << sav_events[i].mask << dendl; | |
48 | if (sav_events[i].mask & EVENT_READABLE) | |
49 | EV_SET(&ke[num++], i, EVFILT_READ, EV_ADD, 0, 0, NULL); | |
50 | if (sav_events[i].mask & EVENT_WRITABLE) | |
51 | EV_SET(&ke[num++], i, EVFILT_WRITE, EV_ADD, 0, 0, NULL); | |
52 | if (num) { | |
53 | if (kevent(kqfd, ke, num, NULL, 0, KEVENT_NOWAIT) == -1) { | |
54 | ldout(cct,0) << __func__ << " unable to add event: " | |
55 | << cpp_strerror(errno) << dendl; | |
56 | return -errno; | |
57 | } | |
58 | } | |
59 | } | |
60 | return 0; | |
61 | } | |
62 | ||
63 | int KqueueDriver::test_thread_change(const char* funcname) { | |
64 | // check to see if we changed thread, because that invalidates | |
65 | // the kqfd and we need to restore that | |
66 | int oldkqfd = kqfd; | |
67 | ||
68 | if (!pthread_equal(mythread, pthread_self())) { | |
69 | ldout(cct,20) << funcname << " We changed thread from " << mythread | |
70 | << " to " << pthread_self() << dendl; | |
71 | mythread = pthread_self(); | |
72 | kqfd = -1; | |
73 | } else if ((kqfd != -1) && (test_kqfd() < 0)) { | |
74 | // should this ever happen? | |
75 | // It would be strange to change kqfd with thread change. | |
11fdf7f2 | 76 | // Might nee to change this into an ceph_assert() in the future. |
7c673cae FG |
77 | ldout(cct,0) << funcname << " Warning: Recreating old kqfd. " |
78 | << "This should not happen!!!" << dendl; | |
79 | kqfd = -1; | |
80 | } | |
81 | if (kqfd == -1) { | |
82 | kqfd = kqueue(); | |
83 | ldout(cct,30) << funcname << " kqueue: new kqfd = " << kqfd | |
84 | << " (was: " << oldkqfd << ")" | |
85 | << dendl; | |
86 | if (kqfd < 0) { | |
87 | lderr(cct) << funcname << " unable to do kqueue: " | |
88 | << cpp_strerror(errno) << dendl; | |
89 | return -errno; | |
90 | } | |
91 | if (restore_events()< 0) { | |
92 | lderr(cct) << funcname << " unable restore all events " | |
93 | << cpp_strerror(errno) << dendl; | |
94 | return -errno; | |
95 | } | |
96 | } | |
97 | return 0; | |
98 | } | |
99 | ||
100 | int KqueueDriver::init(EventCenter *c, int nevent) | |
101 | { | |
102 | // keep track of possible changes of our thread | |
103 | // because change of thread kills the kqfd | |
104 | mythread = pthread_self(); | |
105 | ||
106 | // Reserve the space to accept the kevent return events. | |
107 | res_events = (struct kevent*)malloc(sizeof(struct kevent)*nevent); | |
108 | if (!res_events) { | |
109 | lderr(cct) << __func__ << " unable to malloc memory: " | |
110 | << cpp_strerror(errno) << dendl; | |
111 | return -ENOMEM; | |
112 | } | |
113 | memset(res_events, 0, sizeof(struct kevent)*nevent); | |
114 | size = nevent; | |
115 | ||
116 | // Reserve the space to keep all of the events set, so it can be redone | |
117 | // when we change trhread ID. | |
118 | sav_events = (struct SaveEvent*)malloc(sizeof(struct SaveEvent)*nevent); | |
119 | if (!sav_events) { | |
120 | lderr(cct) << __func__ << " unable to malloc memory: " | |
121 | << cpp_strerror(errno) << dendl; | |
122 | return -ENOMEM; | |
123 | } | |
124 | memset(sav_events, 0, sizeof(struct SaveEvent)*nevent); | |
125 | sav_max = nevent; | |
126 | ||
127 | // Delay assigning a descriptor until it is really needed. | |
128 | // kqfd = kqueue(); | |
129 | kqfd = -1; | |
130 | return 0; | |
131 | } | |
132 | ||
133 | int KqueueDriver::add_event(int fd, int cur_mask, int add_mask) | |
134 | { | |
135 | struct kevent ke[2]; | |
136 | int num = 0; | |
137 | ||
138 | ldout(cct,30) << __func__ << " add event kqfd = " << kqfd << " fd = " << fd | |
139 | << " cur_mask = " << cur_mask << " add_mask = " << add_mask | |
140 | << dendl; | |
141 | ||
142 | int r = test_thread_change(__func__); | |
143 | if ( r < 0 ) | |
144 | return r; | |
145 | ||
146 | if (add_mask & EVENT_READABLE) | |
147 | EV_SET(&ke[num++], fd, EVFILT_READ, EV_ADD|EV_CLEAR, 0, 0, NULL); | |
148 | if (add_mask & EVENT_WRITABLE) | |
149 | EV_SET(&ke[num++], fd, EVFILT_WRITE, EV_ADD|EV_CLEAR, 0, 0, NULL); | |
150 | ||
151 | if (num) { | |
152 | if (kevent(kqfd, ke, num, NULL, 0, KEVENT_NOWAIT) == -1) { | |
153 | lderr(cct) << __func__ << " unable to add event: " | |
154 | << cpp_strerror(errno) << dendl; | |
155 | return -errno; | |
156 | } | |
157 | } | |
158 | // keep what we set | |
159 | if (fd >= sav_max) | |
160 | resize_events(sav_max+5000); | |
161 | sav_events[fd].mask = cur_mask | add_mask; | |
162 | return 0; | |
163 | } | |
164 | ||
165 | int KqueueDriver::del_event(int fd, int cur_mask, int del_mask) | |
166 | { | |
167 | struct kevent ke[2]; | |
168 | int num = 0; | |
169 | int mask = cur_mask & del_mask; | |
170 | ||
171 | ldout(cct,30) << __func__ << " delete event kqfd = " << kqfd | |
172 | << " fd = " << fd << " cur_mask = " << cur_mask | |
173 | << " del_mask = " << del_mask << dendl; | |
174 | ||
175 | int r = test_thread_change(__func__); | |
176 | if ( r < 0 ) | |
177 | return r; | |
178 | ||
179 | if (mask & EVENT_READABLE) | |
180 | EV_SET(&ke[num++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); | |
181 | if (mask & EVENT_WRITABLE) | |
182 | EV_SET(&ke[num++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); | |
183 | ||
184 | if (num) { | |
185 | int r = 0; | |
186 | if ((r = kevent(kqfd, ke, num, NULL, 0, KEVENT_NOWAIT)) < 0) { | |
187 | lderr(cct) << __func__ << " kevent: delete fd=" << fd << " mask=" << mask | |
188 | << " failed." << cpp_strerror(errno) << dendl; | |
189 | return -errno; | |
190 | } | |
191 | } | |
192 | // keep the administration | |
193 | sav_events[fd].mask = cur_mask & ~del_mask; | |
194 | return 0; | |
195 | } | |
196 | ||
197 | int KqueueDriver::resize_events(int newsize) | |
198 | { | |
199 | ldout(cct,30) << __func__ << " kqfd = " << kqfd << "newsize = " << newsize | |
200 | << dendl; | |
11fdf7f2 TL |
201 | if (newsize > sav_max) { |
202 | sav_events = (struct SaveEvent*)realloc(sav_events, sizeof(struct SaveEvent)*newsize); | |
7c673cae FG |
203 | if (!sav_events) { |
204 | lderr(cct) << __func__ << " unable to realloc memory: " | |
205 | << cpp_strerror(errno) << dendl; | |
11fdf7f2 | 206 | ceph_assert(sav_events); |
7c673cae FG |
207 | return -ENOMEM; |
208 | } | |
209 | memset(&sav_events[size], 0, sizeof(struct SaveEvent)*(newsize-sav_max)); | |
210 | sav_max = newsize; | |
211 | } | |
212 | return 0; | |
213 | } | |
214 | ||
20effc67 TL |
215 | int KqueueDriver::event_wait(std::vector<FiredFileEvent> &fired_events, |
216 | struct timeval *tvp) | |
7c673cae FG |
217 | { |
218 | int retval, numevents = 0; | |
219 | struct timespec timeout; | |
220 | ||
221 | ldout(cct,10) << __func__ << " kqfd = " << kqfd << dendl; | |
222 | ||
223 | int r = test_thread_change(__func__); | |
224 | if ( r < 0 ) | |
225 | return r; | |
226 | ||
227 | if (tvp != NULL) { | |
228 | timeout.tv_sec = tvp->tv_sec; | |
229 | timeout.tv_nsec = tvp->tv_usec * 1000; | |
230 | ldout(cct,20) << __func__ << " " | |
231 | << timeout.tv_sec << " sec " | |
232 | << timeout.tv_nsec << " nsec" | |
233 | << dendl; | |
234 | retval = kevent(kqfd, NULL, 0, res_events, size, &timeout); | |
235 | } else { | |
236 | ldout(cct,30) << __func__ << " event_wait: " << " NULL" << dendl; | |
237 | retval = kevent(kqfd, NULL, 0, res_events, size, KEVENT_NOWAIT); | |
238 | } | |
239 | ||
240 | ldout(cct,25) << __func__ << " kevent retval: " << retval << dendl; | |
241 | if (retval < 0) { | |
242 | lderr(cct) << __func__ << " kqueue error: " | |
243 | << cpp_strerror(errno) << dendl; | |
244 | return -errno; | |
245 | } else if (retval == 0) { | |
246 | ldout(cct,5) << __func__ << " Hit timeout(" | |
247 | << timeout.tv_sec << " sec " | |
248 | << timeout.tv_nsec << " nsec" | |
249 | << ")." << dendl; | |
250 | } else { | |
251 | int j; | |
252 | ||
253 | numevents = retval; | |
254 | fired_events.resize(numevents); | |
255 | for (j = 0; j < numevents; j++) { | |
256 | int mask = 0; | |
257 | struct kevent *e = res_events + j; | |
258 | ||
259 | if (e->filter == EVFILT_READ) mask |= EVENT_READABLE; | |
260 | if (e->filter == EVFILT_WRITE) mask |= EVENT_WRITABLE; | |
261 | if (e->flags & EV_ERROR) mask |= EVENT_READABLE|EVENT_WRITABLE; | |
262 | fired_events[j].fd = (int)e->ident; | |
263 | fired_events[j].mask = mask; | |
264 | ||
265 | } | |
266 | } | |
267 | return numevents; | |
268 | } |