]>
Commit | Line | Data |
---|---|---|
5c52c06c DL |
1 | /* |
2 | * Pull-driven write event handler | |
3 | * Copyright (C) 2019 David Lamparter | |
4 | * | |
5 | * This program is free software; you can redistribute it and/or modify it | |
6 | * under the terms of the GNU General Public License as published by the Free | |
7 | * Software Foundation; either version 2 of the License, or (at your option) | |
8 | * any later version. | |
9 | * | |
10 | * This program is distributed in the hope that it will be useful, but WITHOUT | |
11 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | |
12 | * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for | |
13 | * more details. | |
14 | * | |
15 | * You should have received a copy of the GNU General Public License along | |
16 | * with this program; see the file COPYING; if not, write to the Free Software | |
17 | * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA | |
18 | */ | |
19 | ||
20 | #include "zebra.h" | |
21 | ||
22 | #include "pullwr.h" | |
23 | #include "memory.h" | |
24 | #include "monotime.h" | |
25 | ||
26 | /* defaults */ | |
27 | #define PULLWR_THRESH 16384 /* size at which we start to call write() */ | |
28 | #define PULLWR_MAXSPIN 2500 /* max µs to spend grabbing more data */ | |
29 | ||
30 | struct pullwr { | |
31 | int fd; | |
32 | struct thread_master *tm; | |
33 | /* writer == NULL <=> we're idle */ | |
34 | struct thread *writer; | |
35 | ||
36 | void *arg; | |
37 | void (*fill)(void *, struct pullwr *); | |
38 | void (*err)(void *, struct pullwr *, bool); | |
39 | ||
40 | /* ring buffer (although it's "un-ringed" on resizing, it WILL wrap | |
41 | * around if data is trickling in while keeping it at a constant size) | |
42 | */ | |
43 | size_t bufsz, valid, pos; | |
44 | uint64_t total_written; | |
45 | char *buffer; | |
46 | ||
47 | size_t thresh; /* PULLWR_THRESH */ | |
48 | int64_t maxspin; /* PULLWR_MAXSPIN */ | |
49 | }; | |
50 | ||
bf8d3d6a DL |
51 | DEFINE_MTYPE_STATIC(LIB, PULLWR_HEAD, "pull-driven write controller"); |
52 | DEFINE_MTYPE_STATIC(LIB, PULLWR_BUF, "pull-driven write buffer"); | |
5c52c06c | 53 | |
cc9f21da | 54 | static void pullwr_run(struct thread *t); |
5c52c06c DL |
55 | |
56 | struct pullwr *_pullwr_new(struct thread_master *tm, int fd, | |
57 | void *arg, | |
58 | void (*fill)(void *, struct pullwr *), | |
59 | void (*err)(void *, struct pullwr *, bool)) | |
60 | { | |
61 | struct pullwr *pullwr; | |
62 | ||
63 | pullwr = XCALLOC(MTYPE_PULLWR_HEAD, sizeof(*pullwr)); | |
64 | pullwr->fd = fd; | |
65 | pullwr->tm = tm; | |
66 | pullwr->arg = arg; | |
67 | pullwr->fill = fill; | |
68 | pullwr->err = err; | |
69 | ||
70 | pullwr->thresh = PULLWR_THRESH; | |
71 | pullwr->maxspin = PULLWR_MAXSPIN; | |
72 | ||
73 | return pullwr; | |
74 | } | |
75 | ||
76 | void pullwr_del(struct pullwr *pullwr) | |
77 | { | |
78 | THREAD_OFF(pullwr->writer); | |
79 | ||
80 | XFREE(MTYPE_PULLWR_BUF, pullwr->buffer); | |
81 | XFREE(MTYPE_PULLWR_HEAD, pullwr); | |
82 | } | |
83 | ||
84 | void pullwr_cfg(struct pullwr *pullwr, int64_t max_spin_usec, | |
85 | size_t write_threshold) | |
86 | { | |
87 | pullwr->maxspin = max_spin_usec ?: PULLWR_MAXSPIN; | |
88 | pullwr->thresh = write_threshold ?: PULLWR_THRESH; | |
89 | } | |
90 | ||
91 | void pullwr_bump(struct pullwr *pullwr) | |
92 | { | |
93 | if (pullwr->writer) | |
94 | return; | |
95 | ||
96 | thread_add_timer(pullwr->tm, pullwr_run, pullwr, 0, &pullwr->writer); | |
97 | } | |
98 | ||
99 | static size_t pullwr_iov(struct pullwr *pullwr, struct iovec *iov) | |
100 | { | |
101 | size_t len1; | |
102 | ||
103 | if (pullwr->valid == 0) | |
104 | return 0; | |
105 | ||
106 | if (pullwr->pos + pullwr->valid <= pullwr->bufsz) { | |
107 | iov[0].iov_base = pullwr->buffer + pullwr->pos; | |
108 | iov[0].iov_len = pullwr->valid; | |
109 | return 1; | |
110 | } | |
111 | ||
112 | len1 = pullwr->bufsz - pullwr->pos; | |
113 | ||
114 | iov[0].iov_base = pullwr->buffer + pullwr->pos; | |
115 | iov[0].iov_len = len1; | |
116 | iov[1].iov_base = pullwr->buffer; | |
117 | iov[1].iov_len = pullwr->valid - len1; | |
118 | return 2; | |
119 | } | |
120 | ||
121 | static void pullwr_resize(struct pullwr *pullwr, size_t need) | |
122 | { | |
123 | struct iovec iov[2]; | |
124 | size_t niov, newsize; | |
125 | char *newbuf; | |
126 | ||
127 | /* the buffer is maintained at pullwr->thresh * 2 since we'll be | |
128 | * trying to fill it as long as it's anywhere below pullwr->thresh. | |
129 | * That means we frequently end up a little short of it and then write | |
130 | * something that goes over the threshold. So, just use double. | |
131 | */ | |
132 | if (need) { | |
133 | /* resize up */ | |
134 | if (pullwr->bufsz - pullwr->valid >= need) | |
135 | return; | |
136 | ||
137 | newsize = MAX((pullwr->valid + need) * 2, pullwr->thresh * 2); | |
138 | newbuf = XMALLOC(MTYPE_PULLWR_BUF, newsize); | |
139 | } else if (!pullwr->valid) { | |
140 | /* resize down, buffer empty */ | |
141 | newsize = 0; | |
142 | newbuf = NULL; | |
143 | } else { | |
144 | /* resize down */ | |
145 | if (pullwr->bufsz - pullwr->valid < pullwr->thresh) | |
146 | return; | |
147 | newsize = MAX(pullwr->valid, pullwr->thresh * 2); | |
148 | newbuf = XMALLOC(MTYPE_PULLWR_BUF, newsize); | |
149 | } | |
150 | ||
151 | niov = pullwr_iov(pullwr, iov); | |
152 | if (niov >= 1) { | |
153 | memcpy(newbuf, iov[0].iov_base, iov[0].iov_len); | |
154 | if (niov >= 2) | |
155 | memcpy(newbuf + iov[0].iov_len, | |
156 | iov[1].iov_base, iov[1].iov_len); | |
157 | } | |
158 | ||
159 | XFREE(MTYPE_PULLWR_BUF, pullwr->buffer); | |
160 | pullwr->buffer = newbuf; | |
161 | pullwr->bufsz = newsize; | |
162 | pullwr->pos = 0; | |
163 | } | |
164 | ||
165 | void pullwr_write(struct pullwr *pullwr, const void *data, size_t len) | |
166 | { | |
167 | pullwr_resize(pullwr, len); | |
168 | ||
169 | if (pullwr->pos + pullwr->valid > pullwr->bufsz) { | |
170 | size_t pos; | |
171 | ||
172 | pos = (pullwr->pos + pullwr->valid) % pullwr->bufsz; | |
173 | memcpy(pullwr->buffer + pos, data, len); | |
174 | } else { | |
175 | size_t max1, len1; | |
176 | max1 = pullwr->bufsz - (pullwr->pos + pullwr->valid); | |
177 | max1 = MIN(max1, len); | |
178 | ||
179 | memcpy(pullwr->buffer + pullwr->pos + pullwr->valid, | |
180 | data, max1); | |
181 | len1 = len - max1; | |
182 | ||
183 | if (len1) | |
184 | memcpy(pullwr->buffer, (char *)data + max1, len1); | |
185 | ||
186 | } | |
187 | pullwr->valid += len; | |
188 | ||
189 | pullwr_bump(pullwr); | |
190 | } | |
191 | ||
cc9f21da | 192 | static void pullwr_run(struct thread *t) |
5c52c06c DL |
193 | { |
194 | struct pullwr *pullwr = THREAD_ARG(t); | |
195 | struct iovec iov[2]; | |
196 | size_t niov, lastvalid; | |
197 | ssize_t nwr; | |
198 | struct timeval t0; | |
199 | bool maxspun = false; | |
200 | ||
201 | monotime(&t0); | |
202 | ||
203 | do { | |
204 | lastvalid = pullwr->valid - 1; | |
205 | while (pullwr->valid < pullwr->thresh | |
206 | && pullwr->valid != lastvalid | |
207 | && !maxspun) { | |
208 | lastvalid = pullwr->valid; | |
209 | pullwr->fill(pullwr->arg, pullwr); | |
210 | ||
211 | /* check after doing at least one fill() call so we | |
212 | * don't spin without making progress on slow boxes | |
213 | */ | |
214 | if (!maxspun && monotime_since(&t0, NULL) | |
215 | >= pullwr->maxspin) | |
216 | maxspun = true; | |
217 | } | |
218 | ||
219 | if (pullwr->valid == 0) { | |
220 | /* we made a fill() call above that didn't feed any | |
221 | * data in, and we have nothing more queued, so we go | |
222 | * into idle, i.e. no calling thread_add_write() | |
223 | */ | |
224 | pullwr_resize(pullwr, 0); | |
cc9f21da | 225 | return; |
5c52c06c DL |
226 | } |
227 | ||
228 | niov = pullwr_iov(pullwr, iov); | |
229 | assert(niov); | |
230 | ||
231 | nwr = writev(pullwr->fd, iov, niov); | |
232 | if (nwr < 0) { | |
233 | if (errno == EAGAIN || errno == EWOULDBLOCK) | |
234 | break; | |
235 | pullwr->err(pullwr->arg, pullwr, false); | |
cc9f21da | 236 | return; |
5c52c06c DL |
237 | } |
238 | ||
239 | if (nwr == 0) { | |
240 | pullwr->err(pullwr->arg, pullwr, true); | |
cc9f21da | 241 | return; |
5c52c06c DL |
242 | } |
243 | ||
244 | pullwr->total_written += nwr; | |
245 | pullwr->valid -= nwr; | |
246 | pullwr->pos += nwr; | |
247 | pullwr->pos %= pullwr->bufsz; | |
248 | } while (pullwr->valid == 0 && !maxspun); | |
249 | /* pullwr->valid != 0 implies we did an incomplete write, i.e. socket | |
250 | * is full and we go wait until it's available for writing again. | |
251 | */ | |
252 | ||
253 | thread_add_write(pullwr->tm, pullwr_run, pullwr, pullwr->fd, | |
254 | &pullwr->writer); | |
255 | ||
256 | /* if we hit the time limit, just keep the buffer, we'll probably need | |
257 | * it anyway & another run is already coming up. | |
258 | */ | |
259 | if (!maxspun) | |
260 | pullwr_resize(pullwr, 0); | |
5c52c06c DL |
261 | } |
262 | ||
263 | void pullwr_stats(struct pullwr *pullwr, uint64_t *total_written, | |
264 | size_t *pending, size_t *kernel_pending) | |
265 | { | |
266 | int tmp; | |
267 | ||
268 | *total_written = pullwr->total_written; | |
269 | *pending = pullwr->valid; | |
270 | ||
271 | if (ioctl(pullwr->fd, TIOCOUTQ, &tmp) != 0) | |
272 | tmp = 0; | |
273 | *kernel_pending = tmp; | |
274 | } |