]> git.proxmox.com Git - mirror_frr.git/blob - lib/pullwr.c
*: Convert THREAD_XXX macros to EVENT_XXX macros
[mirror_frr.git] / lib / pullwr.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3 * Pull-driven write event handler
4 * Copyright (C) 2019 David Lamparter
5 */
6
7 #include "zebra.h"
8
9 #include "pullwr.h"
10 #include "memory.h"
11 #include "monotime.h"
12
13 /* defaults */
14 #define PULLWR_THRESH 16384 /* size at which we start to call write() */
15 #define PULLWR_MAXSPIN 2500 /* max µs to spend grabbing more data */
16
17 struct pullwr {
18 int fd;
19 struct event_master *tm;
20 /* writer == NULL <=> we're idle */
21 struct event *writer;
22
23 void *arg;
24 void (*fill)(void *, struct pullwr *);
25 void (*err)(void *, struct pullwr *, bool);
26
27 /* ring buffer (although it's "un-ringed" on resizing, it WILL wrap
28 * around if data is trickling in while keeping it at a constant size)
29 */
30 size_t bufsz, valid, pos;
31 uint64_t total_written;
32 char *buffer;
33
34 size_t thresh; /* PULLWR_THRESH */
35 int64_t maxspin; /* PULLWR_MAXSPIN */
36 };
37
38 DEFINE_MTYPE_STATIC(LIB, PULLWR_HEAD, "pull-driven write controller");
39 DEFINE_MTYPE_STATIC(LIB, PULLWR_BUF, "pull-driven write buffer");
40
41 static void pullwr_run(struct event *t);
42
43 struct pullwr *_pullwr_new(struct event_master *tm, int fd, void *arg,
44 void (*fill)(void *, struct pullwr *),
45 void (*err)(void *, struct pullwr *, bool))
46 {
47 struct pullwr *pullwr;
48
49 pullwr = XCALLOC(MTYPE_PULLWR_HEAD, sizeof(*pullwr));
50 pullwr->fd = fd;
51 pullwr->tm = tm;
52 pullwr->arg = arg;
53 pullwr->fill = fill;
54 pullwr->err = err;
55
56 pullwr->thresh = PULLWR_THRESH;
57 pullwr->maxspin = PULLWR_MAXSPIN;
58
59 return pullwr;
60 }
61
62 void pullwr_del(struct pullwr *pullwr)
63 {
64 EVENT_OFF(pullwr->writer);
65
66 XFREE(MTYPE_PULLWR_BUF, pullwr->buffer);
67 XFREE(MTYPE_PULLWR_HEAD, pullwr);
68 }
69
70 void pullwr_cfg(struct pullwr *pullwr, int64_t max_spin_usec,
71 size_t write_threshold)
72 {
73 pullwr->maxspin = max_spin_usec ?: PULLWR_MAXSPIN;
74 pullwr->thresh = write_threshold ?: PULLWR_THRESH;
75 }
76
77 void pullwr_bump(struct pullwr *pullwr)
78 {
79 if (pullwr->writer)
80 return;
81
82 event_add_timer(pullwr->tm, pullwr_run, pullwr, 0, &pullwr->writer);
83 }
84
85 static size_t pullwr_iov(struct pullwr *pullwr, struct iovec *iov)
86 {
87 size_t len1;
88
89 if (pullwr->valid == 0)
90 return 0;
91
92 if (pullwr->pos + pullwr->valid <= pullwr->bufsz) {
93 iov[0].iov_base = pullwr->buffer + pullwr->pos;
94 iov[0].iov_len = pullwr->valid;
95 return 1;
96 }
97
98 len1 = pullwr->bufsz - pullwr->pos;
99
100 iov[0].iov_base = pullwr->buffer + pullwr->pos;
101 iov[0].iov_len = len1;
102 iov[1].iov_base = pullwr->buffer;
103 iov[1].iov_len = pullwr->valid - len1;
104 return 2;
105 }
106
107 static void pullwr_resize(struct pullwr *pullwr, size_t need)
108 {
109 struct iovec iov[2];
110 size_t niov, newsize;
111 char *newbuf;
112
113 /* the buffer is maintained at pullwr->thresh * 2 since we'll be
114 * trying to fill it as long as it's anywhere below pullwr->thresh.
115 * That means we frequently end up a little short of it and then write
116 * something that goes over the threshold. So, just use double.
117 */
118 if (need) {
119 /* resize up */
120 if (pullwr->bufsz - pullwr->valid >= need)
121 return;
122
123 newsize = MAX((pullwr->valid + need) * 2, pullwr->thresh * 2);
124 newbuf = XMALLOC(MTYPE_PULLWR_BUF, newsize);
125 } else if (!pullwr->valid) {
126 /* resize down, buffer empty */
127 newsize = 0;
128 newbuf = NULL;
129 } else {
130 /* resize down */
131 if (pullwr->bufsz - pullwr->valid < pullwr->thresh)
132 return;
133 newsize = MAX(pullwr->valid, pullwr->thresh * 2);
134 newbuf = XMALLOC(MTYPE_PULLWR_BUF, newsize);
135 }
136
137 niov = pullwr_iov(pullwr, iov);
138 if (niov >= 1) {
139 memcpy(newbuf, iov[0].iov_base, iov[0].iov_len);
140 if (niov >= 2)
141 memcpy(newbuf + iov[0].iov_len,
142 iov[1].iov_base, iov[1].iov_len);
143 }
144
145 XFREE(MTYPE_PULLWR_BUF, pullwr->buffer);
146 pullwr->buffer = newbuf;
147 pullwr->bufsz = newsize;
148 pullwr->pos = 0;
149 }
150
151 void pullwr_write(struct pullwr *pullwr, const void *data, size_t len)
152 {
153 pullwr_resize(pullwr, len);
154
155 if (pullwr->pos + pullwr->valid > pullwr->bufsz) {
156 size_t pos;
157
158 pos = (pullwr->pos + pullwr->valid) % pullwr->bufsz;
159 memcpy(pullwr->buffer + pos, data, len);
160 } else {
161 size_t max1, len1;
162 max1 = pullwr->bufsz - (pullwr->pos + pullwr->valid);
163 max1 = MIN(max1, len);
164
165 memcpy(pullwr->buffer + pullwr->pos + pullwr->valid,
166 data, max1);
167 len1 = len - max1;
168
169 if (len1)
170 memcpy(pullwr->buffer, (char *)data + max1, len1);
171
172 }
173 pullwr->valid += len;
174
175 pullwr_bump(pullwr);
176 }
177
178 static void pullwr_run(struct event *t)
179 {
180 struct pullwr *pullwr = EVENT_ARG(t);
181 struct iovec iov[2];
182 size_t niov, lastvalid;
183 ssize_t nwr;
184 struct timeval t0;
185 bool maxspun = false;
186
187 monotime(&t0);
188
189 do {
190 lastvalid = pullwr->valid - 1;
191 while (pullwr->valid < pullwr->thresh
192 && pullwr->valid != lastvalid
193 && !maxspun) {
194 lastvalid = pullwr->valid;
195 pullwr->fill(pullwr->arg, pullwr);
196
197 /* check after doing at least one fill() call so we
198 * don't spin without making progress on slow boxes
199 */
200 if (!maxspun && monotime_since(&t0, NULL)
201 >= pullwr->maxspin)
202 maxspun = true;
203 }
204
205 if (pullwr->valid == 0) {
206 /* we made a fill() call above that didn't feed any
207 * data in, and we have nothing more queued, so we go
208 * into idle, i.e. no calling event_add_write()
209 */
210 pullwr_resize(pullwr, 0);
211 return;
212 }
213
214 niov = pullwr_iov(pullwr, iov);
215 assert(niov);
216
217 nwr = writev(pullwr->fd, iov, niov);
218 if (nwr < 0) {
219 if (errno == EAGAIN || errno == EWOULDBLOCK)
220 break;
221 pullwr->err(pullwr->arg, pullwr, false);
222 return;
223 }
224
225 if (nwr == 0) {
226 pullwr->err(pullwr->arg, pullwr, true);
227 return;
228 }
229
230 pullwr->total_written += nwr;
231 pullwr->valid -= nwr;
232 pullwr->pos += nwr;
233 pullwr->pos %= pullwr->bufsz;
234 } while (pullwr->valid == 0 && !maxspun);
235 /* pullwr->valid != 0 implies we did an incomplete write, i.e. socket
236 * is full and we go wait until it's available for writing again.
237 */
238
239 event_add_write(pullwr->tm, pullwr_run, pullwr, pullwr->fd,
240 &pullwr->writer);
241
242 /* if we hit the time limit, just keep the buffer, we'll probably need
243 * it anyway & another run is already coming up.
244 */
245 if (!maxspun)
246 pullwr_resize(pullwr, 0);
247 }
248
249 void pullwr_stats(struct pullwr *pullwr, uint64_t *total_written,
250 size_t *pending, size_t *kernel_pending)
251 {
252 int tmp;
253
254 *total_written = pullwr->total_written;
255 *pending = pullwr->valid;
256
257 if (ioctl(pullwr->fd, TIOCOUTQ, &tmp) != 0)
258 tmp = 0;
259 *kernel_pending = tmp;
260 }