]> git.proxmox.com Git - mirror_frr.git/blob - lib/pullwr.c
Merge pull request #5746 from donaldsharp/bgp_sa
[mirror_frr.git] / lib / pullwr.c
1 /*
2 * Pull-driven write event handler
3 * Copyright (C) 2019 David Lamparter
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the Free
7 * Software Foundation; either version 2 of the License, or (at your option)
8 * any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19
20 #include "zebra.h"
21
22 #include "pullwr.h"
23 #include "memory.h"
24 #include "monotime.h"
25
26 /* defaults */
27 #define PULLWR_THRESH 16384 /* size at which we start to call write() */
28 #define PULLWR_MAXSPIN 2500 /* max µs to spend grabbing more data */
29
30 struct pullwr {
31 int fd;
32 struct thread_master *tm;
33 /* writer == NULL <=> we're idle */
34 struct thread *writer;
35
36 void *arg;
37 void (*fill)(void *, struct pullwr *);
38 void (*err)(void *, struct pullwr *, bool);
39
40 /* ring buffer (although it's "un-ringed" on resizing, it WILL wrap
41 * around if data is trickling in while keeping it at a constant size)
42 */
43 size_t bufsz, valid, pos;
44 uint64_t total_written;
45 char *buffer;
46
47 size_t thresh; /* PULLWR_THRESH */
48 int64_t maxspin; /* PULLWR_MAXSPIN */
49 };
50
51 DEFINE_MTYPE_STATIC(LIB, PULLWR_HEAD, "pull-driven write controller")
52 DEFINE_MTYPE_STATIC(LIB, PULLWR_BUF, "pull-driven write buffer")
53
54 static int pullwr_run(struct thread *t);
55
56 struct pullwr *_pullwr_new(struct thread_master *tm, int fd,
57 void *arg,
58 void (*fill)(void *, struct pullwr *),
59 void (*err)(void *, struct pullwr *, bool))
60 {
61 struct pullwr *pullwr;
62
63 pullwr = XCALLOC(MTYPE_PULLWR_HEAD, sizeof(*pullwr));
64 pullwr->fd = fd;
65 pullwr->tm = tm;
66 pullwr->arg = arg;
67 pullwr->fill = fill;
68 pullwr->err = err;
69
70 pullwr->thresh = PULLWR_THRESH;
71 pullwr->maxspin = PULLWR_MAXSPIN;
72
73 return pullwr;
74 }
75
76 void pullwr_del(struct pullwr *pullwr)
77 {
78 THREAD_OFF(pullwr->writer);
79
80 XFREE(MTYPE_PULLWR_BUF, pullwr->buffer);
81 XFREE(MTYPE_PULLWR_HEAD, pullwr);
82 }
83
84 void pullwr_cfg(struct pullwr *pullwr, int64_t max_spin_usec,
85 size_t write_threshold)
86 {
87 pullwr->maxspin = max_spin_usec ?: PULLWR_MAXSPIN;
88 pullwr->thresh = write_threshold ?: PULLWR_THRESH;
89 }
90
91 void pullwr_bump(struct pullwr *pullwr)
92 {
93 if (pullwr->writer)
94 return;
95
96 thread_add_timer(pullwr->tm, pullwr_run, pullwr, 0, &pullwr->writer);
97 }
98
99 static size_t pullwr_iov(struct pullwr *pullwr, struct iovec *iov)
100 {
101 size_t len1;
102
103 if (pullwr->valid == 0)
104 return 0;
105
106 if (pullwr->pos + pullwr->valid <= pullwr->bufsz) {
107 iov[0].iov_base = pullwr->buffer + pullwr->pos;
108 iov[0].iov_len = pullwr->valid;
109 return 1;
110 }
111
112 len1 = pullwr->bufsz - pullwr->pos;
113
114 iov[0].iov_base = pullwr->buffer + pullwr->pos;
115 iov[0].iov_len = len1;
116 iov[1].iov_base = pullwr->buffer;
117 iov[1].iov_len = pullwr->valid - len1;
118 return 2;
119 }
120
121 static void pullwr_resize(struct pullwr *pullwr, size_t need)
122 {
123 struct iovec iov[2];
124 size_t niov, newsize;
125 char *newbuf;
126
127 /* the buffer is maintained at pullwr->thresh * 2 since we'll be
128 * trying to fill it as long as it's anywhere below pullwr->thresh.
129 * That means we frequently end up a little short of it and then write
130 * something that goes over the threshold. So, just use double.
131 */
132 if (need) {
133 /* resize up */
134 if (pullwr->bufsz - pullwr->valid >= need)
135 return;
136
137 newsize = MAX((pullwr->valid + need) * 2, pullwr->thresh * 2);
138 newbuf = XMALLOC(MTYPE_PULLWR_BUF, newsize);
139 } else if (!pullwr->valid) {
140 /* resize down, buffer empty */
141 newsize = 0;
142 newbuf = NULL;
143 } else {
144 /* resize down */
145 if (pullwr->bufsz - pullwr->valid < pullwr->thresh)
146 return;
147 newsize = MAX(pullwr->valid, pullwr->thresh * 2);
148 newbuf = XMALLOC(MTYPE_PULLWR_BUF, newsize);
149 }
150
151 niov = pullwr_iov(pullwr, iov);
152 if (niov >= 1) {
153 memcpy(newbuf, iov[0].iov_base, iov[0].iov_len);
154 if (niov >= 2)
155 memcpy(newbuf + iov[0].iov_len,
156 iov[1].iov_base, iov[1].iov_len);
157 }
158
159 XFREE(MTYPE_PULLWR_BUF, pullwr->buffer);
160 pullwr->buffer = newbuf;
161 pullwr->bufsz = newsize;
162 pullwr->pos = 0;
163 }
164
165 void pullwr_write(struct pullwr *pullwr, const void *data, size_t len)
166 {
167 pullwr_resize(pullwr, len);
168
169 if (pullwr->pos + pullwr->valid > pullwr->bufsz) {
170 size_t pos;
171
172 pos = (pullwr->pos + pullwr->valid) % pullwr->bufsz;
173 memcpy(pullwr->buffer + pos, data, len);
174 } else {
175 size_t max1, len1;
176 max1 = pullwr->bufsz - (pullwr->pos + pullwr->valid);
177 max1 = MIN(max1, len);
178
179 memcpy(pullwr->buffer + pullwr->pos + pullwr->valid,
180 data, max1);
181 len1 = len - max1;
182
183 if (len1)
184 memcpy(pullwr->buffer, (char *)data + max1, len1);
185
186 }
187 pullwr->valid += len;
188
189 pullwr_bump(pullwr);
190 }
191
192 static int pullwr_run(struct thread *t)
193 {
194 struct pullwr *pullwr = THREAD_ARG(t);
195 struct iovec iov[2];
196 size_t niov, lastvalid;
197 ssize_t nwr;
198 struct timeval t0;
199 bool maxspun = false;
200
201 monotime(&t0);
202
203 do {
204 lastvalid = pullwr->valid - 1;
205 while (pullwr->valid < pullwr->thresh
206 && pullwr->valid != lastvalid
207 && !maxspun) {
208 lastvalid = pullwr->valid;
209 pullwr->fill(pullwr->arg, pullwr);
210
211 /* check after doing at least one fill() call so we
212 * don't spin without making progress on slow boxes
213 */
214 if (!maxspun && monotime_since(&t0, NULL)
215 >= pullwr->maxspin)
216 maxspun = true;
217 }
218
219 if (pullwr->valid == 0) {
220 /* we made a fill() call above that didn't feed any
221 * data in, and we have nothing more queued, so we go
222 * into idle, i.e. no calling thread_add_write()
223 */
224 pullwr_resize(pullwr, 0);
225 return 0;
226 }
227
228 niov = pullwr_iov(pullwr, iov);
229 assert(niov);
230
231 nwr = writev(pullwr->fd, iov, niov);
232 if (nwr < 0) {
233 if (errno == EAGAIN || errno == EWOULDBLOCK)
234 break;
235 pullwr->err(pullwr->arg, pullwr, false);
236 return 0;
237 }
238
239 if (nwr == 0) {
240 pullwr->err(pullwr->arg, pullwr, true);
241 return 0;
242 }
243
244 pullwr->total_written += nwr;
245 pullwr->valid -= nwr;
246 pullwr->pos += nwr;
247 pullwr->pos %= pullwr->bufsz;
248 } while (pullwr->valid == 0 && !maxspun);
249 /* pullwr->valid != 0 implies we did an incomplete write, i.e. socket
250 * is full and we go wait until it's available for writing again.
251 */
252
253 thread_add_write(pullwr->tm, pullwr_run, pullwr, pullwr->fd,
254 &pullwr->writer);
255
256 /* if we hit the time limit, just keep the buffer, we'll probably need
257 * it anyway & another run is already coming up.
258 */
259 if (!maxspun)
260 pullwr_resize(pullwr, 0);
261 return 0;
262 }
263
264 void pullwr_stats(struct pullwr *pullwr, uint64_t *total_written,
265 size_t *pending, size_t *kernel_pending)
266 {
267 int tmp;
268
269 *total_written = pullwr->total_written;
270 *pending = pullwr->valid;
271
272 if (ioctl(pullwr->fd, TIOCOUTQ, &tmp) != 0)
273 tmp = 0;
274 *kernel_pending = tmp;
275 }