]> git.proxmox.com Git - mirror_frr.git/blame - lib/pullwr.c
Merge pull request #12818 from imzyxwvu/fix/other-table-inactive
[mirror_frr.git] / lib / pullwr.c
CommitLineData
acddc0ed 1// SPDX-License-Identifier: GPL-2.0-or-later
5c52c06c
DL
2/*
3 * Pull-driven write event handler
4 * Copyright (C) 2019 David Lamparter
5c52c06c
DL
5 */
6
7#include "zebra.h"
8
9#include "pullwr.h"
10#include "memory.h"
11#include "monotime.h"
12
13/* defaults */
14#define PULLWR_THRESH 16384 /* size at which we start to call write() */
15#define PULLWR_MAXSPIN 2500 /* max µs to spend grabbing more data */
16
17struct pullwr {
18 int fd;
19 struct thread_master *tm;
20 /* writer == NULL <=> we're idle */
21 struct thread *writer;
22
23 void *arg;
24 void (*fill)(void *, struct pullwr *);
25 void (*err)(void *, struct pullwr *, bool);
26
27 /* ring buffer (although it's "un-ringed" on resizing, it WILL wrap
28 * around if data is trickling in while keeping it at a constant size)
29 */
30 size_t bufsz, valid, pos;
31 uint64_t total_written;
32 char *buffer;
33
34 size_t thresh; /* PULLWR_THRESH */
35 int64_t maxspin; /* PULLWR_MAXSPIN */
36};
37
bf8d3d6a
DL
38DEFINE_MTYPE_STATIC(LIB, PULLWR_HEAD, "pull-driven write controller");
39DEFINE_MTYPE_STATIC(LIB, PULLWR_BUF, "pull-driven write buffer");
5c52c06c 40
cc9f21da 41static void pullwr_run(struct thread *t);
5c52c06c
DL
42
43struct pullwr *_pullwr_new(struct thread_master *tm, int fd,
44 void *arg,
45 void (*fill)(void *, struct pullwr *),
46 void (*err)(void *, struct pullwr *, bool))
47{
48 struct pullwr *pullwr;
49
50 pullwr = XCALLOC(MTYPE_PULLWR_HEAD, sizeof(*pullwr));
51 pullwr->fd = fd;
52 pullwr->tm = tm;
53 pullwr->arg = arg;
54 pullwr->fill = fill;
55 pullwr->err = err;
56
57 pullwr->thresh = PULLWR_THRESH;
58 pullwr->maxspin = PULLWR_MAXSPIN;
59
60 return pullwr;
61}
62
63void pullwr_del(struct pullwr *pullwr)
64{
65 THREAD_OFF(pullwr->writer);
66
67 XFREE(MTYPE_PULLWR_BUF, pullwr->buffer);
68 XFREE(MTYPE_PULLWR_HEAD, pullwr);
69}
70
71void pullwr_cfg(struct pullwr *pullwr, int64_t max_spin_usec,
72 size_t write_threshold)
73{
74 pullwr->maxspin = max_spin_usec ?: PULLWR_MAXSPIN;
75 pullwr->thresh = write_threshold ?: PULLWR_THRESH;
76}
77
78void pullwr_bump(struct pullwr *pullwr)
79{
80 if (pullwr->writer)
81 return;
82
83 thread_add_timer(pullwr->tm, pullwr_run, pullwr, 0, &pullwr->writer);
84}
85
86static size_t pullwr_iov(struct pullwr *pullwr, struct iovec *iov)
87{
88 size_t len1;
89
90 if (pullwr->valid == 0)
91 return 0;
92
93 if (pullwr->pos + pullwr->valid <= pullwr->bufsz) {
94 iov[0].iov_base = pullwr->buffer + pullwr->pos;
95 iov[0].iov_len = pullwr->valid;
96 return 1;
97 }
98
99 len1 = pullwr->bufsz - pullwr->pos;
100
101 iov[0].iov_base = pullwr->buffer + pullwr->pos;
102 iov[0].iov_len = len1;
103 iov[1].iov_base = pullwr->buffer;
104 iov[1].iov_len = pullwr->valid - len1;
105 return 2;
106}
107
108static void pullwr_resize(struct pullwr *pullwr, size_t need)
109{
110 struct iovec iov[2];
111 size_t niov, newsize;
112 char *newbuf;
113
114 /* the buffer is maintained at pullwr->thresh * 2 since we'll be
115 * trying to fill it as long as it's anywhere below pullwr->thresh.
116 * That means we frequently end up a little short of it and then write
117 * something that goes over the threshold. So, just use double.
118 */
119 if (need) {
120 /* resize up */
121 if (pullwr->bufsz - pullwr->valid >= need)
122 return;
123
124 newsize = MAX((pullwr->valid + need) * 2, pullwr->thresh * 2);
125 newbuf = XMALLOC(MTYPE_PULLWR_BUF, newsize);
126 } else if (!pullwr->valid) {
127 /* resize down, buffer empty */
128 newsize = 0;
129 newbuf = NULL;
130 } else {
131 /* resize down */
132 if (pullwr->bufsz - pullwr->valid < pullwr->thresh)
133 return;
134 newsize = MAX(pullwr->valid, pullwr->thresh * 2);
135 newbuf = XMALLOC(MTYPE_PULLWR_BUF, newsize);
136 }
137
138 niov = pullwr_iov(pullwr, iov);
139 if (niov >= 1) {
140 memcpy(newbuf, iov[0].iov_base, iov[0].iov_len);
141 if (niov >= 2)
142 memcpy(newbuf + iov[0].iov_len,
143 iov[1].iov_base, iov[1].iov_len);
144 }
145
146 XFREE(MTYPE_PULLWR_BUF, pullwr->buffer);
147 pullwr->buffer = newbuf;
148 pullwr->bufsz = newsize;
149 pullwr->pos = 0;
150}
151
152void pullwr_write(struct pullwr *pullwr, const void *data, size_t len)
153{
154 pullwr_resize(pullwr, len);
155
156 if (pullwr->pos + pullwr->valid > pullwr->bufsz) {
157 size_t pos;
158
159 pos = (pullwr->pos + pullwr->valid) % pullwr->bufsz;
160 memcpy(pullwr->buffer + pos, data, len);
161 } else {
162 size_t max1, len1;
163 max1 = pullwr->bufsz - (pullwr->pos + pullwr->valid);
164 max1 = MIN(max1, len);
165
166 memcpy(pullwr->buffer + pullwr->pos + pullwr->valid,
167 data, max1);
168 len1 = len - max1;
169
170 if (len1)
171 memcpy(pullwr->buffer, (char *)data + max1, len1);
172
173 }
174 pullwr->valid += len;
175
176 pullwr_bump(pullwr);
177}
178
cc9f21da 179static void pullwr_run(struct thread *t)
5c52c06c
DL
180{
181 struct pullwr *pullwr = THREAD_ARG(t);
182 struct iovec iov[2];
183 size_t niov, lastvalid;
184 ssize_t nwr;
185 struct timeval t0;
186 bool maxspun = false;
187
188 monotime(&t0);
189
190 do {
191 lastvalid = pullwr->valid - 1;
192 while (pullwr->valid < pullwr->thresh
193 && pullwr->valid != lastvalid
194 && !maxspun) {
195 lastvalid = pullwr->valid;
196 pullwr->fill(pullwr->arg, pullwr);
197
198 /* check after doing at least one fill() call so we
199 * don't spin without making progress on slow boxes
200 */
201 if (!maxspun && monotime_since(&t0, NULL)
202 >= pullwr->maxspin)
203 maxspun = true;
204 }
205
206 if (pullwr->valid == 0) {
207 /* we made a fill() call above that didn't feed any
208 * data in, and we have nothing more queued, so we go
209 * into idle, i.e. no calling thread_add_write()
210 */
211 pullwr_resize(pullwr, 0);
cc9f21da 212 return;
5c52c06c
DL
213 }
214
215 niov = pullwr_iov(pullwr, iov);
216 assert(niov);
217
218 nwr = writev(pullwr->fd, iov, niov);
219 if (nwr < 0) {
220 if (errno == EAGAIN || errno == EWOULDBLOCK)
221 break;
222 pullwr->err(pullwr->arg, pullwr, false);
cc9f21da 223 return;
5c52c06c
DL
224 }
225
226 if (nwr == 0) {
227 pullwr->err(pullwr->arg, pullwr, true);
cc9f21da 228 return;
5c52c06c
DL
229 }
230
231 pullwr->total_written += nwr;
232 pullwr->valid -= nwr;
233 pullwr->pos += nwr;
234 pullwr->pos %= pullwr->bufsz;
235 } while (pullwr->valid == 0 && !maxspun);
236 /* pullwr->valid != 0 implies we did an incomplete write, i.e. socket
237 * is full and we go wait until it's available for writing again.
238 */
239
240 thread_add_write(pullwr->tm, pullwr_run, pullwr, pullwr->fd,
241 &pullwr->writer);
242
243 /* if we hit the time limit, just keep the buffer, we'll probably need
244 * it anyway & another run is already coming up.
245 */
246 if (!maxspun)
247 pullwr_resize(pullwr, 0);
5c52c06c
DL
248}
249
250void pullwr_stats(struct pullwr *pullwr, uint64_t *total_written,
251 size_t *pending, size_t *kernel_pending)
252{
253 int tmp;
254
255 *total_written = pullwr->total_written;
256 *pending = pullwr->valid;
257
258 if (ioctl(pullwr->fd, TIOCOUTQ, &tmp) != 0)
259 tmp = 0;
260 *kernel_pending = tmp;
261}